ADD Initial Code Import 91/9491/1
authorVarun Gudisena <vg411h@att.com>
Wed, 30 Aug 2017 16:34:48 +0000 (11:34 -0500)
committerVarun Gudisena <vg411h@att.com>
Wed, 30 Aug 2017 16:35:00 +0000 (11:35 -0500)
Added initial code for mirror agent

Issue-id: DMAAP-76
Change-Id: I8b4521706c4f3a96720987fb75b5b8d5cfd05ec3
Signed-off-by: Varun Gudisena <vg411h@att.com>
19 files changed:
.gitignore [new file with mode: 0644]
LICENSE [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/config/consumer.properties [new file with mode: 0644]
src/main/config/mmagent.config [new file with mode: 0644]
src/main/config/mmagent.config_old [new file with mode: 0644]
src/main/config/producer.properties [new file with mode: 0644]
src/main/config/template.lrm.xml [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java [new file with mode: 0644]
src/main/resources/log4j.properties [new file with mode: 0644]
src/main/scripts/mmagent [new file with mode: 0644]
src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..b83d222
--- /dev/null
@@ -0,0 +1 @@
+/target/
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..2ce945c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * * 
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * * 
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..1cb4120
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,166 @@
+<!--
+     ============LICENSE_START=======================================================
+     org.onap.dmaap
+     ================================================================================
+     Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+     ================================================================================
+     Licensed under the Apache License, Version 2.0 (the "License");
+     you may not use this file except in compliance with the License.
+     You may obtain a copy of the License at
+           http://www.apache.org/licenses/LICENSE-2.0
+     
+     Unless required by applicable law or agreed to in writing, software
+     distributed under the License is distributed on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     See the License for the specific language governing permissions and
+     limitations under the License.
+     ============LICENSE_END=========================================================
+   
+     ECOMP is a trademark and service mark of AT&T Intellectual Property.
+     
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>org.onap.dmaap.messagerouter.mirroragent</groupId>
+       <artifactId>dmaapMMAgent</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+       <packaging>jar</packaging>
+       
+       <parent>
+               <groupId>org.onap.oparent</groupId>
+               <artifactId>oparent</artifactId>
+               <version>1.0.0-SNAPSHOT</version>
+       </parent>
+
+       <name>dmaapMMAgent</name>
+       <description>Mirror Maker Agent - Repliaction agent</description>
+       <url>https://github.com/att/dmaap-framework</url>
+
+       <properties>
+               <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+       </properties>
+       <licenses>
+               <license>
+                       <name>Apache License Version 2.0</name>
+               </license>
+       </licenses>
+       <developers>
+               <developer>
+                       <name>Jackie</name>
+                       <email></email>
+                       <organization>ATT</organization>
+                       <organizationUrl>www.att.com</organizationUrl>
+               </developer>
+       </developers>
+
+       <dependencies>
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>3.8.1</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.code.gson</groupId>
+                       <artifactId>gson</artifactId>
+                       <version>2.6.2</version>
+               </dependency>
+               <dependency>
+                       <groupId>log4j</groupId>
+                       <artifactId>log4j</artifactId>
+                       <version>1.2.17</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.jasypt</groupId>
+                       <artifactId>jasypt</artifactId>
+                       <version>1.9.2</version>
+               </dependency>
+       </dependencies>
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-javadoc-plugin</artifactId>
+                               <version>2.10.4</version>
+                               <configuration>
+                                       <additionalparam>-Xdoclint:none</additionalparam>
+                               </configuration>
+                               <executions>
+                                       <execution>
+                                               <id>attach-javadocs</id>
+                                               <goals>
+                                                       <goal>jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-source-plugin</artifactId>
+                               <version>3.0.0</version>
+                               <executions>
+                                       <execution>
+                                               <id>attach-sources</id>
+                                               <goals>
+                                                       <goal>jar-no-fork</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.codehaus.mojo</groupId>
+                               <artifactId>cobertura-maven-plugin</artifactId>
+                               <version>2.7</version>
+                               <configuration>
+                                       <formats>
+                                               <format>html</format>
+                                               <format>xml</format>
+                                       </formats>
+                               </configuration>
+                       </plugin>
+                       <plugin>
+                               <artifactId>maven-assembly-plugin</artifactId>
+                               <version>2.4</version>
+                               <configuration>
+                                       <descriptorRefs>
+                                               <descriptorRef>jar-with-dependencies</descriptorRef>
+                                       </descriptorRefs>
+                                       <archive>
+
+                                               <manifest>
+                                                       <addClasspath>true</addClasspath>
+                                                       <mainClass>org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent</mainClass>
+                                               </manifest>
+                                       </archive>
+                               </configuration>
+
+                               <executions>
+                                       <execution>
+                                               <id>make-assembly</id> <!-- this is used for inheritance merges -->
+                                               <phase>package</phase> <!-- bind to the packaging phase -->
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+                       <plugin> 
+                                <groupId>org.apache.maven.plugins</groupId> 
+                                <artifactId>maven-gpg-plugin</artifactId> 
+                                    <version>1.5</version> 
+                                    <executions> 
+                                            <execution> 
+                                                    <id>sign-artifacts</id> 
+                                                    <phase>verify</phase> 
+                                                    <goals> 
+                                                       <goal>sign</goal> 
+                                                    </goals> 
+                                         </execution> 
+                                       </executions> 
+                  </plugin>
+
+               </plugins>
+       </build>
+</project>
diff --git a/src/main/config/consumer.properties b/src/main/config/consumer.properties
new file mode 100644 (file)
index 0000000..08d29af
--- /dev/null
@@ -0,0 +1,50 @@
+###############################################################################
+#  ============LICENSE_START=======================================================
+#  org.onap.dmaap
+#  ================================================================================
+#  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#        http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+#
+#  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#  
+###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.consumer.ConsumerConfig for more details
+
+# Zookeeper connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zookeeper.connect=172.18.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=6000
+
+#consumer group id
+group.id=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000
diff --git a/src/main/config/mmagent.config b/src/main/config/mmagent.config
new file mode 100644 (file)
index 0000000..9e43eae
--- /dev/null
@@ -0,0 +1,7 @@
+#kafkahome=C:/dev/att/kafka_2.10-0.8.2.1
+kafkahome=/opt/
+topicURL=http://172.18.0.1:3904
+#topicname=org.onap.dmaap.messagerouter.mirroragent.agenttest
+topicname=org.openecomp.dmaapBC.mmatopic
+mechid=dgl@openecomp.org
+password=ecomp_admin
\ No newline at end of file
diff --git a/src/main/config/mmagent.config_old b/src/main/config/mmagent.config_old
new file mode 100644 (file)
index 0000000..a0b80df
--- /dev/null
@@ -0,0 +1,5 @@
+kafkahome=/opt/app/dmaap/msgrtr/kafka
+topicURL=http://<mr_host>:3904
+topicname=org.onap.dmaap.messagerouter.mirroragent.agenttest
+mechid=<aaf_id>
+password=<aaf_password>
\ No newline at end of file
diff --git a/src/main/config/producer.properties b/src/main/config/producer.properties
new file mode 100644 (file)
index 0000000..30df665
--- /dev/null
@@ -0,0 +1,74 @@
+###############################################################################
+#  ============LICENSE_START=======================================================
+#  org.onap.dmaap
+#  ================================================================================
+#  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#        http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+#
+#  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#  
+###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.producer.ProducerConfig for more details
+
+############################# Producer Basics #############################
+
+# list of brokers used for bootstrapping knowledge about the rest of the cluster
+# format: host1:port1,host2:port2 ...
+metadata.broker.list=104.130.132.211:9092
+
+# name of the partitioner class for partitioning events; default partition spreads data randomly
+#partitioner.class=
+
+# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
+producer.type=sync
+
+# specify the compression codec for all data generated: none, gzip, snappy, lz4.
+# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
+compression.codec=none
+
+# message encoder
+serializer.class=kafka.serializer.DefaultEncoder
+
+# allow topic level compression
+#compressed.topics=
+
+############################# Async Producer #############################
+# maximum time, in milliseconds, for buffering data on the producer queue 
+#queue.buffering.max.ms=
+
+# the maximum size of the blocking queue for buffering on the producer 
+#queue.buffering.max.messages=
+
+# Timeout for event enqueue:
+# 0: events will be enqueued immediately or dropped if the queue is full
+# -ve: enqueue will block indefinitely if the queue is full
+# +ve: enqueue will block up to this many milliseconds if the queue is full
+#queue.enqueue.timeout.ms=
+
+# the number of messages batched at the producer 
+#batch.num.messages=
diff --git a/src/main/config/template.lrm.xml b/src/main/config/template.lrm.xml
new file mode 100644 (file)
index 0000000..a36cf2c
--- /dev/null
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+     ============LICENSE_START=======================================================
+     org.onap.dmaap
+     ================================================================================
+     Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+     ================================================================================
+     Licensed under the Apache License, Version 2.0 (the "License");
+     you may not use this file except in compliance with the License.
+     You may obtain a copy of the License at
+           http://www.apache.org/licenses/LICENSE-2.0
+     
+     Unless required by applicable law or agreed to in writing, software
+     distributed under the License is distributed on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     See the License for the specific language governing permissions and
+     limitations under the License.
+     ============LICENSE_END=========================================================
+   
+     ECOMP is a trademark and service mark of AT&T Intellectual Property.
+     
+ -->
+
+<ns2:ManagedResourceList xmlns="http://scld.att.com/lrm/commontypes"
+       xmlns:ns2="http://scld.att.com/lrm/util" xmlns:ns3="http://scld.att.com/lrm/types">
+       <ns2:ManagedResource>
+               <ResourceDescriptor>
+                       <ResourceName>__SOA_CLOUD_NAMESPACE__.${artifactId}</ResourceName>
+                       <ResourceVersion>
+                               <Major>__MAJOR_VERSION__</Major>
+                               <Minor>__MINOR_VERSION__</Minor>
+                               <Patch>__PATCH_VERSION__</Patch>
+                       </ResourceVersion>
+                       
+               </ResourceDescriptor>
+               <ResourceType>Java</ResourceType>
+               <ResourceContainerType></ResourceContainerType>
+               <ResourceContainerTypeVendor>ATT</ResourceContainerTypeVendor>
+               <ResourcePath>/opt/app/dmaap/mmagent</ResourcePath>
+               <ResourceProps>
+                       <Tag>process.path</Tag>
+                       <Value>/usr/bin:/usr/sbin:${PATH}</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>process.workdir</Tag>
+                       <Value>/opt/app/dmaap/mmagent</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>process.libpath</Tag>
+                       <Value>${LD_LIBRARY_PATH}</Value>
+               </ResourceProps>
+
+               <!-- The following process.args are for your context, port, and sslport. 
+                       NOTE: The "context" argument will set your context. context=/ will run your service under a "root" context. 
+                       Port selection is NOT necessary and should NOT be hardcoded if you are choosing to utilize ephemeral ports.
+                       If NO port values are given, the AJSC will default to running on ephemeral ports (for both your http port 
+                       AND your https port), and your port will be selected for you. -->
+               
+               <!-- Add JMX port -->
+               
+               <ResourceProps>
+                       <Tag>jmx.port</Tag>
+                       <Value>__JMX_PORT_MRAGENT__</Value>
+               </ResourceProps>
+               
+               
+               <!-- End -->
+               
+               <ResourceProps>
+                       <Tag>jvm.version</Tag>
+                       <Value>__JAVA_VERSION__</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>jvm.classpath</Tag>
+                       <Value>:.:${CLASSPATH}:/opt/app/dmaap/mmagent/etc:/opt/app/dmaap/mmagent/lib/*:</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>jvm.args.pre</Tag>
+                       <Value>__PRE_JVM_ARGS__ -XX:MaxPermSize=__MAX_PERM_SIZE__
+                               -XX:PermSize=__PERM_SIZE__
+                               __INTROSCOPE_VARS__
+                               -Djava.net.preferIPv4Stack=true 
+                               -DMMAGENTHOME=/opt/app/dmaap/mmagent
+                               __POST_JVM_ARGS__
+                               __SCLD_OPTIONAL_PLATFORM_FLAG__
+                                -DMMAGENTHOME=/opt/app/dmaap/mmagent
+                       </Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>jvm.heap.min</Tag>
+                       <Value>__MIN_HEAP_SIZE__</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>jvm.heap.max</Tag>
+                       <Value>__MAX_HEAP_SIZE__</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>start.class</Tag>
+                       <Value>org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>stdout.redirect</Tag>
+                       <Value>log/stdout.log</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>stderr.redirect</Tag>
+                       <Value>log/stdout.log</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>validatePID.waitime.seconds</Tag>
+                       <Value>__LRM_VALIDATEPID_WAITTIME_SECONDS__</Value>
+               </ResourceProps>
+               <ResourceProps>
+                       <Tag>mbean.name</Tag>
+                       
+                       <Value>JmxInterface:type=DME2</Value>
+               </ResourceProps>
+               <ResourceOSID>msgrtr</ResourceOSID>
+               <ResourceStartType>__LRM_RESOURCE_START_TYPE__</ResourceStartType>
+               <ResourceStartPriority>__LRM_START_PRIORITY__</ResourceStartPriority>
+               <ResourceStartTimeout>__LRM_START_TIMEOUT__</ResourceStartTimeout>
+               <ResourceMinCount>__RESOURCE_MIN_COUNT__</ResourceMinCount>
+               <ResourceMaxCount>__RESOURCE_MAX_COUNT__</ResourceMaxCount>
+               <ResourceMaxRestart>__LRM_RESOURCE_MAX_RESTART__</ResourceMaxRestart>
+               <ResourceHeartBeat>__LRM_RESOURCE_HEARTBEAT__</ResourceHeartBeat>
+               <ResourceHeartBeatFailedLimit>__LRM_RESOURCE_HEARTBEAT_FAILED_LIMIT__</ResourceHeartBeatFailedLimit>
+               <ResourceHeartBeatTimeout>__LRM_RESOURCE_HEARTBEAT_TIMEOUT__</ResourceHeartBeatTimeout>
+               <ResourceShutdownWaitTimeInSecs>__RESOURCE_MANAGER_WAIT_TIME_IN_SECONDS__</ResourceShutdownWaitTimeInSecs>
+               <ResourceRegistration>__LRM_RESOURCE_REGISTRATION__</ResourceRegistration>
+               <GroupName>dmaap</GroupName>
+               <ResourceErrorNotify>
+                       <NotifyListEntry>
+                               <Loglevel>WARNING</Loglevel>
+                               <EmailList>__CLDLRM_WARNING_NOTIFY__</EmailList>
+                       </NotifyListEntry>
+                       <NotifyListEntry>
+                               <Loglevel>SEVERE</Loglevel>
+                               <EmailList>__CLDLRM_SEVERE_NOTIFY__</EmailList>
+                       </NotifyListEntry>
+               </ResourceErrorNotify>
+       </ns2:ManagedResource>
+</ns2:ManagedResourceList>
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java
new file mode 100644 (file)
index 0000000..b6644b6
--- /dev/null
@@ -0,0 +1,588 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.jasypt.util.text.BasicTextEncryptor;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.ListMirrorMaker;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.MirrorMaker;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.UpdateWhiteList;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
+
+import com.google.gson.Gson;
+import com.google.gson.internal.LinkedTreeMap;
+import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
+import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
+
+public class MirrorMakerAgent {
+       static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
+       Properties mirrorMakerProperties = new Properties();
+       ListMirrorMaker mirrorMakers = null;
+       String mmagenthome = "";
+       String kafkahome = "";
+       String topicURL = "";
+       String topicname = "";
+       String mechid = "";
+       String password = "";
+       private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
+
+       public static void main(String[] args) {
+               if (args != null && args.length == 2) {
+                       if (args[0].equals("-encrypt")) {
+                               BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
+                               textEncryptor.setPassword(secret);
+                               String plainText = textEncryptor.encrypt(args[1]);
+                               System.out.println("Encrypted Password is :" + plainText);
+                               return;
+                       }
+               } else if (args != null && args.length > 0) {
+                       System.out.println(
+                                       "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
+                       return;
+               }
+               MirrorMakerAgent agent = new MirrorMakerAgent();
+               if (agent.checkStartup()) {
+                       logger.info("mmagent started, loading properties");
+                       agent.checkAgentProcess();
+                       agent.readAgentTopic();
+               } else {
+                       System.out.println(
+                                       "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
+               }
+       }
+
+       private boolean checkStartup() {
+               FileInputStream input = null;
+               try {
+                       this.mmagenthome = System.getProperty("MMAGENTHOME");
+                       input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
+                       logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
+               } catch (IOException ex) {
+                       logger.error(mmagenthome + "/etc/mmagent.config not found.  Set -DMMAGENTHOME and check the config file");
+                       return false;
+               } finally {
+                       if (input != null) {
+                               try {
+                                       input.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+               loadProperties();
+               input = null;
+               try {
+                       input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
+                       logger.info("kakahome is set :" + kafkahome);
+               } catch (IOException ex) {
+                       logger.error(kafkahome + "/bin/kafka-run-class.sh not found.  Make sure kafka home is set correctly");
+                       return false;
+               } finally {
+                       if (input != null) {
+                               try {
+                                       input.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+               String response = publishTopic("{\"test\":\"test\"}");
+               if (response.startsWith("ERROR:")) {
+                       logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
+                                       + this.topicURL + " Error is:  " + response);
+                       return false;
+               }
+               logger.info("Published to Topic :" + this.topicname + " Successfully");
+               response = subscribeTopic("1");
+               if (response != null && response.startsWith("ERROR:")) {
+                       logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
+                                       + this.topicURL + " Error is:  " + response);
+                       return false;
+               }
+               logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
+               return true;
+       }
+
+       private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
+               InputStream input = null;
+               OutputStream out = null;
+               try {
+                       if (refresh) {
+                               throw new IOException();
+                       }
+                       input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
+               } catch (IOException ex) {
+                       try {
+                               input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
+                               Properties prop = new Properties();
+                               prop.load(input);
+                               if (propName.equals("consumer")) {
+                                       prop.setProperty("group.id", agentName);
+                                       prop.setProperty("zookeeper.connect", info);
+                               } else {
+                                       prop.setProperty("metadata.broker.list", info);
+                               }
+                               out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
+                               prop.store(out, "");
+
+                       } catch (Exception e) {
+                               e.printStackTrace();
+                       }
+               } finally {
+                       if (input != null) {
+                               try {
+                                       input.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+                       if (out != null) {
+                               try {
+                                       out.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+       }
+
+       private void checkAgentProcess() {
+               logger.info("Checking MirrorMaker Process");
+               if (mirrorMakers != null) {
+                       int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+                       for (int i = 0; i < mirrorMakersCount; i++) {
+                               MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+                               if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
+                                       checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
+                                       checkPropertiesFile(mm.name, "producer", mm.producer, false);
+
+                                       if (mm.whitelist != null && !mm.whitelist.equals("")) {
+                                               logger.info("MirrorMaker " + mm.name + " is not running, restarting.  Check Logs for more Details");
+                                               MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
+                                                               mmagenthome + "/etc/" + mm.name + "consumer.properties",
+                                                               mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
+                                               mm.setStatus("RESTARTING");
+
+                                       } else {
+                                               logger.info("MirrorMaker " + mm.name + " is STOPPED");
+                                               mm.setStatus("STOPPED");
+                                       }
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e) {
+                                       }
+                                       mirrorMakers.getListMirrorMaker().set(i, mm);
+                               } else {
+                                       logger.info("MirrorMaker " + mm.name + " is running");
+                                       mm.setStatus("RUNNING");
+                                       mirrorMakers.getListMirrorMaker().set(i, mm);
+                               }
+                       }
+               }
+               // Gson g = new Gson();
+               // System.out.println(g.toJson(mirrorMakers));
+       }
+
+       private String subscribeTopic(String timeout) {
+               String response = "";
+               try {
+                       String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
+                                       + "&limit=1";
+                       String authString = this.mechid + ":" + this.password;
+                       String authStringEnc = Base64.encode(authString.getBytes());
+                       URL url = new URL(requestURL);
+                       HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                       connection.setRequestMethod("GET");
+                       connection.setDoOutput(true);
+                       connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
+                       connection.setRequestProperty("Content-Type", "application/json");
+                       InputStream content = (InputStream) connection.getInputStream();
+                       BufferedReader in = new BufferedReader(new InputStreamReader(content));
+                       String line;
+
+                       while ((line = in.readLine()) != null) {
+                               response = response + line;
+                       }
+                       Gson g = new Gson();
+                       // get message as JSON String Array
+                       String[] topicMessage = g.fromJson(response, String[].class);
+                       if (topicMessage.length != 0) {
+                               return topicMessage[0];
+                       }
+               } catch (Exception e) {
+                       return "ERROR:" + e.getMessage() + " Server Response is:" + response;
+               }
+               return null;
+       }
+
+       private String publishTopic(String message) {
+               try {
+                       String requestURL = this.topicURL + "/events/" + this.topicname;
+                       String authString = this.mechid + ":" + this.password;
+                       String authStringEnc = Base64.encode(authString.getBytes());
+                       URL url = new URL(requestURL);
+                       HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                       connection.setRequestMethod("POST");
+                       connection.setDoOutput(true);
+                       connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
+                       connection.setRequestProperty("Content-Type", "application/json");
+                       connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
+                       DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+                       wr.write(message.getBytes());
+
+                       InputStream content = (InputStream) connection.getInputStream();
+                       BufferedReader in = new BufferedReader(new InputStreamReader(content));
+                       String line;
+                       String response = "";
+                       while ((line = in.readLine()) != null) {
+                               response = response + line;
+                       }
+                       return response;
+
+               } catch (Exception e) {
+                       return "ERROR:" + e.getLocalizedMessage();
+               }
+       }
+
+       private void readAgentTopic() {
+               try {
+                       int connectionattempt = 0;
+                       while (true) {
+                               logger.info("--------------------------------");
+                               logger.info("Waiting for Messages for 60 secs");
+                               String topicMessage = subscribeTopic("60000");
+                               Gson g = new Gson();
+                               LinkedTreeMap<?, ?> object = null;
+                               if (topicMessage != null) {
+                                       try {
+                                               object = g.fromJson(topicMessage, LinkedTreeMap.class);
+
+                                               // Cast the 1st item (since limit=1 and see the type of
+                                               // object
+                                               if (object.get("createMirrorMaker") != null) {
+                                                       logger.info("Received createMirrorMaker request from topic");
+                                                       CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
+                                                       createMirrorMaker(m.getCreateMirrorMaker());
+                                                       checkAgentProcess();
+                                                       mirrorMakers.setMessageID(m.getMessageID());
+                                                       publishTopic(g.toJson(mirrorMakers));
+                                                       mirrorMakers.setMessageID("");
+                                               } else if (object.get("updateMirrorMaker") != null) {
+                                                       logger.info("Received updateMirrorMaker request from topic");
+                                                       UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
+                                                       updateMirrorMaker(m.getUpdateMirrorMaker());
+                                                       checkAgentProcess();
+                                                       mirrorMakers.setMessageID(m.getMessageID());
+                                                       publishTopic(g.toJson(mirrorMakers));
+                                                       mirrorMakers.setMessageID("");
+                                               } else if (object.get("deleteMirrorMaker") != null) {
+                                                       logger.info("Received deleteMirrorMaker request from topic");
+                                                       DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
+                                                       deleteMirrorMaker(m.getDeleteMirrorMaker());
+                                                       checkAgentProcess();
+                                                       mirrorMakers.setMessageID(m.getMessageID());
+                                                       publishTopic(g.toJson(mirrorMakers));
+                                                       mirrorMakers.setMessageID("");
+                                               } else if (object.get("listAllMirrorMaker") != null) {
+                                                       logger.info("Received listALLMirrorMaker request from topic");
+                                                       checkAgentProcess();
+                                                       mirrorMakers.setMessageID((String) object.get("messageID"));
+                                                       publishTopic(g.toJson(mirrorMakers));
+                                                       mirrorMakers.setMessageID("");
+                                               } else if (object.get("updateWhiteList") != null) {
+                                                       logger.info("Received updateWhiteList request from topic");
+                                                       UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
+                                                       updateWhiteList(m.getUpdateWhiteList());
+                                                       checkAgentProcess();
+                                                       mirrorMakers.setMessageID(m.getMessageID());
+                                                       publishTopic(g.toJson(mirrorMakers));
+                                                       mirrorMakers.setMessageID("");
+                                               } else if (object.get("listMirrorMaker") != null) {
+                                                       logger.info("Received listMirrorMaker from topic, skipping messages");
+                                               } else {
+                                                       logger.info("Received unknown request from topic");
+                                               }
+                                       } catch (Exception ex) {
+                                               connectionattempt++;
+                                               if (connectionattempt > 5) {
+                                                       logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
+                                                       return;
+                                               }
+                                               logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
+                                                               + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
+                                               Thread.sleep(60000);
+                                       }
+                               } else {
+                                       // Check all MirrorMaker every min
+                                       connectionattempt = 0;
+                                       checkAgentProcess();
+                               }
+
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+
+       }
+
+       private void createMirrorMaker(MirrorMaker newMirrorMaker) {
+               boolean exists = false;
+               if (mirrorMakers != null) {
+                       int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+                       for (int i = 0; i < mirrorMakersCount; i++) {
+                               MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+                               if (mm.name.equals(newMirrorMaker.name)) {
+                                       exists = true;
+                                       logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
+                                       return;
+                               }
+                       }
+               }
+               logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
+               if (exists == false && mirrorMakers != null) {
+                       mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
+               } else if (exists == false && mirrorMakers == null) {
+                       mirrorMakers = new ListMirrorMaker();
+                       ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
+                       list = new ArrayList<MirrorMaker>();
+                       list.add(newMirrorMaker);
+                       mirrorMakers.setListMirrorMaker(list);
+               }
+               checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
+               checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
+
+               Gson g = new Gson();
+               mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+               OutputStream out = null;
+               try {
+                       out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+                       mirrorMakerProperties.store(out, "");
+               } catch (IOException ex) {
+                       ex.printStackTrace();
+               } finally {
+                       if (out != null) {
+                               try {
+                                       out.close();
+                               } catch (IOException e) {
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+       }
+
+       private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
+               boolean exists = false;
+               if (mirrorMakers != null) {
+                       int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+                       for (int i = 0; i < mirrorMakersCount; i++) {
+                               MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+                               if (mm.name.equals(newMirrorMaker.name)) {
+                                       exists = true;
+                                       mm.setConsumer(newMirrorMaker.getConsumer());
+                                       mm.setProducer(newMirrorMaker.getProducer());
+                                       mirrorMakers.getListMirrorMaker().set(i, mm);
+                                       logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
+                               }
+                       }
+               }
+               if (exists) {
+                       checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
+                       checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
+
+                       Gson g = new Gson();
+                       mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+                       OutputStream out = null;
+                       try {
+                               out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+                               mirrorMakerProperties.store(out, "");
+                               MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                               }
+                       } catch (IOException ex) {
+                               ex.printStackTrace();
+                       } finally {
+                               if (out != null) {
+                                       try {
+                                               out.close();
+                                       } catch (IOException e) {
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+               } else {
+                       logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+               }
+       }
+
+       private void updateWhiteList(MirrorMaker newMirrorMaker) {
+               boolean exists = false;
+               if (mirrorMakers != null) {
+                       int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+                       for (int i = 0; i < mirrorMakersCount; i++) {
+                               MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+                               if (mm.name.equals(newMirrorMaker.name)) {
+                                       exists = true;
+                                       mm.setWhitelist(newMirrorMaker.whitelist);
+                                       mirrorMakers.getListMirrorMaker().set(i, mm);
+                                       logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
+                                                       + newMirrorMaker.whitelist);
+                               }
+                       }
+               }
+               if (exists) {
+                       Gson g = new Gson();
+                       mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+                       OutputStream out = null;
+                       try {
+                               out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+                               mirrorMakerProperties.store(out, "");
+                               MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+                               try {
+                                       Thread.sleep(1000);
+                               } catch (InterruptedException e) {
+                               }
+                       } catch (IOException ex) {
+                               ex.printStackTrace();
+                       } finally {
+                               if (out != null) {
+                                       try {
+                                               out.close();
+                                       } catch (IOException e) {
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+               } else {
+                       logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+               }
+       }
+
+       private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
+               boolean exists = false;
+               if (mirrorMakers != null) {
+                       int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+                       for (int i = 0; i < mirrorMakersCount; i++) {
+                               MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+                               if (mm.name.equals(newMirrorMaker.name)) {
+                                       exists = true;
+                                       mirrorMakers.getListMirrorMaker().remove(i);
+                                       logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
+                                       i = mirrorMakersCount;
+                               }
+                       }
+               }
+               if (exists) {
+                       try {
+                               String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
+                               File file = new File(path);
+                               file.delete();
+                       } catch (Exception ex) {
+                       }
+                       try {
+                               String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
+                               File file = new File(path);
+                               file.delete();
+                       } catch (Exception ex) {
+                       }
+                       Gson g = new Gson();
+                       mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+                       OutputStream out = null;
+                       try {
+                               out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+                               mirrorMakerProperties.store(out, "");
+                               MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+                       } catch (IOException ex) {
+                               ex.printStackTrace();
+                       } finally {
+                               if (out != null) {
+                                       try {
+                                               out.close();
+                                       } catch (IOException e) {
+                                               e.printStackTrace();
+                                       }
+                               }
+                       }
+               } else {
+                       logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+               }
+       }
+
+       private void loadProperties() {
+               InputStream input = null;
+               try {
+
+                       input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
+                       mirrorMakerProperties.load(input);
+                       Gson g = new Gson();
+                       if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
+                               this.mirrorMakers = new ListMirrorMaker();
+                               ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
+                               list = new ArrayList<MirrorMaker>();
+                               this.mirrorMakers.setListMirrorMaker(list);
+                       } else {
+                               this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
+                                               ListMirrorMaker.class);
+                       }
+
+                       this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
+                       this.topicURL = mirrorMakerProperties.getProperty("topicURL");
+                       this.topicname = mirrorMakerProperties.getProperty("topicname");
+                       this.mechid = mirrorMakerProperties.getProperty("mechid");
+
+                       BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
+                       textEncryptor.setPassword(secret);
+                       //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
+                       this.password = mirrorMakerProperties.getProperty("password");
+               } catch (IOException ex) {
+                       // ex.printStackTrace();
+               } finally {
+                       if (input != null) {
+                               try {
+                                       input.close();
+                               } catch (IOException e) {
+                                       // e.printStackTrace();
+                               }
+                       }
+               }
+
+       }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java
new file mode 100644 (file)
index 0000000..bf1207a
--- /dev/null
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+public class CreateMirrorMaker {
+       String messageID;
+       MirrorMaker createMirrorMaker;
+
+       public MirrorMaker getCreateMirrorMaker() {
+               return createMirrorMaker;
+       }
+
+       public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) {
+               this.createMirrorMaker = createMirrorMaker;
+       }
+
+       public String getMessageID() {
+               return messageID;
+       }
+
+       public void setMessageID(String messageID) {
+               this.messageID = messageID;
+       }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java
new file mode 100644 (file)
index 0000000..dcabff6
--- /dev/null
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+public class DeleteMirrorMaker {
+       String messageID;
+       MirrorMaker deleteMirrorMaker;
+
+       public MirrorMaker getDeleteMirrorMaker() {
+               return deleteMirrorMaker;
+       }
+
+       public void setDeleteMirrorMaker(MirrorMaker deleteMirrorMaker) {
+               this.deleteMirrorMaker = deleteMirrorMaker;
+       }
+
+       public String getMessageID() {
+               return messageID;
+       }
+
+       public void setMessageID(String messageID) {
+               this.messageID = messageID;
+       }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java
new file mode 100644 (file)
index 0000000..56953be
--- /dev/null
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+import java.util.ArrayList;
+
+public class ListMirrorMaker {
+       String messageID;
+       ArrayList<MirrorMaker> listMirrorMaker;
+
+       public ArrayList<MirrorMaker> getListMirrorMaker() {
+               return listMirrorMaker;
+       }
+
+       public void setListMirrorMaker(ArrayList<MirrorMaker> createMirrorMaker) {
+               this.listMirrorMaker = createMirrorMaker;
+       }
+
+       public String getMessageID() {
+               return messageID;
+       }
+
+       public void setMessageID(String messageID) {
+               this.messageID = messageID;
+       }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java
new file mode 100644 (file)
index 0000000..d496aea
--- /dev/null
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+public class MirrorMaker {
+       public String name;
+       public String consumer;
+       public String producer;
+       public String whitelist;
+       public String status;
+
+       public String getStatus() {
+               return status;
+       }
+
+       public void setStatus(String status) {
+               this.status = status;
+       }
+
+       public String getName() {
+               return name;
+       }
+
+       public void setName(String name) {
+               this.name = name;
+       }
+
+       public String getConsumer() {
+               return consumer;
+       }
+
+       public void setConsumer(String consumer) {
+               this.consumer = consumer;
+       }
+
+       public String getProducer() {
+               return producer;
+       }
+
+       public void setProducer(String producer) {
+               this.producer = producer;
+       }
+
+       public String getWhitelist() {
+               return whitelist;
+       }
+
+       public void setWhitelist(String whitelist) {
+               this.whitelist = whitelist;
+       }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java
new file mode 100644 (file)
index 0000000..d78054f
--- /dev/null
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+public class UpdateMirrorMaker {
+       String messageID;
+       MirrorMaker updateMirrorMaker;
+
+       public MirrorMaker getUpdateMirrorMaker() {
+               return updateMirrorMaker;
+       }
+
+       public void setUpdateMirrorMaker(MirrorMaker updateMirrorMaker) {
+               this.updateMirrorMaker = updateMirrorMaker;
+       }
+
+       public String getMessageID() {
+               return messageID;
+       }
+
+       public void setMessageID(String messageID) {
+               this.messageID = messageID;
+       }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java
new file mode 100644 (file)
index 0000000..215fb34
--- /dev/null
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao;
+
+public class UpdateWhiteList {
+       String messageID;
+       MirrorMaker updateWhiteList;
+
+       public MirrorMaker getUpdateWhiteList() {
+               return updateWhiteList;
+       }
+
+       public void setUpdateWhiteList(MirrorMaker updateWhiteList) {
+               this.updateWhiteList = updateWhiteList;
+       }
+
+       public String getMessageID() {
+               return messageID;
+       }
+
+       public void setMessageID(String messageID) {
+               this.messageID = messageID;
+       }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java
new file mode 100644 (file)
index 0000000..05c81be
--- /dev/null
@@ -0,0 +1,154 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.utils;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.log4j.Logger;
+import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent;
+
+public class MirrorMakerProcessHandler {
+       static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class);
+
+       public static boolean checkMirrorMakerProcess(String agentname) {
+               try {
+                       Runtime rt = Runtime.getRuntime();
+                       Process mmprocess = null;
+
+                       if (System.getProperty("os.name").contains("Windows")) {
+                               String args = "";
+                               args = "wmic.exe process where \"commandline like '%agentname=" + agentname
+                                               + "~%' and caption='java.exe'\"";
+                               mmprocess = rt.exec(args);
+                       } else {
+                               String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+                               mmprocess = rt.exec(args);
+                       }
+
+                       InputStream is = mmprocess.getInputStream();
+                       InputStreamReader isr = new InputStreamReader(is);
+                       BufferedReader br = new BufferedReader(isr);
+                       String line;
+                       while ((line = br.readLine()) != null) {
+                               // System.out.println(line);
+                               if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
+                                       return true;
+                               }
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+               return false;
+       }
+
+       public static void stopMirrorMaker(String agentname) {
+               try {
+                       Runtime rt = Runtime.getRuntime();
+                       Process killprocess = null;
+
+                       if (System.getProperty("os.name").contains("Windows")) {
+                               String args = "wmic.exe process where \"commandline like '%agentname=" + agentname
+                                               + "~%' and caption='java.exe'\" call terminate";
+                               killprocess = rt.exec(args);
+                       } else {
+                               String args[] = { "/bin/sh", "-c",
+                                               "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+                               // args = "kill $(ps -ef |grep java |grep agentname=" +
+                               // agentname + "~| awk '{print $2}')";
+                               killprocess = rt.exec(args);
+                       }
+
+                       InputStream is = killprocess.getInputStream();
+                       InputStreamReader isr = new InputStreamReader(is);
+                       BufferedReader br = new BufferedReader(isr);
+                       String line;
+                       while ((line = br.readLine()) != null) {
+                               // System.out.println(line);
+                       }
+
+                       logger.info("Mirror Maker " + agentname + " Stopped");
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+
+       }
+
+       public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig,
+                       String producerConfig, String whitelist) {
+               try {
+                       Runtime rt = Runtime.getRuntime();
+
+                       if (System.getProperty("os.name").contains("Windows")) {
+                               String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName
+                                               + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config "
+                                               + producerConfig + " --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
+                                               + "_MMaker.log";
+                               final Process process = rt.exec(args);
+                               new Thread() {
+                                       public void run() {
+                                               try {
+                                                       InputStream is = process.getInputStream();
+                                                       InputStreamReader isr = new InputStreamReader(is);
+                                                       BufferedReader br = new BufferedReader(isr);
+                                                       String line;
+                                                       while ((line = br.readLine()) != null) {
+                                                               // System.out.println(line);
+                                                       }
+                                               } catch (Exception anExc) {
+                                                       anExc.printStackTrace();
+                                               }
+                                       }
+                               }.start();
+                       } else {
+                               String args[] = { "/bin/sh", "-c",
+                                               kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName
+                                                               + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig
+                                                               + " --producer.config " + producerConfig + " --whitelist '" + whitelist + "' >"
+                                                               + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" };
+                               final Process process = rt.exec(args);
+                               new Thread() {
+                                       public void run() {
+                                               try {
+                                                       InputStream is = process.getInputStream();
+                                                       InputStreamReader isr = new InputStreamReader(is);
+                                                       BufferedReader br = new BufferedReader(isr);
+                                                       String line;
+                                                       while ((line = br.readLine()) != null) {
+                                                               // System.out.println(line);
+                                                       }
+                                               } catch (Exception anExc) {
+                                                       anExc.printStackTrace();
+                                               }
+                                       }
+                               }.start();
+                       }
+
+                       logger.info("Mirror Maker " + agentName + " Started" + " WhiteListing:" + whitelist);
+
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+       }
+}
diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties
new file mode 100644 (file)
index 0000000..a0ac24f
--- /dev/null
@@ -0,0 +1,37 @@
+###############################################################################
+#  ============LICENSE_START=======================================================
+#  org.onap.dmaap
+#  ================================================================================
+#  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#        http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+#
+#  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#  
+###############################################################################
+# Root logger option
+log4j.rootLogger=INFO, stdout, file
+
+# Redirect log messages to console
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m %n
+
+# Redirect log messages to a log file, support file rolling.
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File= ${MMAGENTHOME}/logs/mmagent.log
+log4j.appender.file.MaxFileSize=5MB
+log4j.appender.file.MaxBackupIndex=10
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m  %n
\ No newline at end of file
diff --git a/src/main/scripts/mmagent b/src/main/scripts/mmagent
new file mode 100644 (file)
index 0000000..18a75ea
--- /dev/null
@@ -0,0 +1,17 @@
+#!/bin/sh
+
+JAVA_HOMES="${INSTALL_ROOT}/opt/app/java/jdk/jdk170 ${INSTALL_ROOT}/opt/app/java/jdk/jdk160"
+for jhome in ${JAVA_HOMES}; do
+   if [ -x "${jhome}"/bin/java ]; then
+      export JAVA_HOME=${jhome}
+   fi
+done
+
+ROOT_DIR=`dirname $0`/..
+ROOT_DIR=`cd $ROOT_DIR; pwd`
+CLASSPATH=${ROOT_DIR}'/lib/*'
+PATH=${JAVA_HOME}/bin:${PATH}
+export JAVA_HOME CLASSPATH PATH
+
+exec java -DMMAGENTHOME=$ROOT_DIR com.att.nsa.dmaapMMAgent.MirrorMakerAgent "$@"
+
diff --git a/src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java b/src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java
new file mode 100644 (file)
index 0000000..19a48b8
--- /dev/null
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+/**
+ * Unit test for simple App.
+ */
+public class AppTest 
+    extends TestCase
+{
+    /**
+     * Create the test case
+     *
+     * @param testName name of the test case
+     */
+    public AppTest( String testName )
+    {
+        super( testName );
+    }
+
+    /**
+     * @return the suite of tests being tested
+     */
+    public static Test suite()
+    {
+        return new TestSuite( AppTest.class );
+    }
+
+    /**
+     * Rigourous Test :-)
+     */
+    public void testApp()
+    {
+        assertTrue( true );
+    }
+}