From 2b4d7eb39b0ceb182467d564ab457a61ce873ed3 Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Wed, 30 Aug 2017 11:34:48 -0500 Subject: [PATCH] ADD Initial Code Import Added initial code for mirror agent Issue-id: DMAAP-76 Change-Id: I8b4521706c4f3a96720987fb75b5b8d5cfd05ec3 Signed-off-by: Varun Gudisena --- .gitignore | 1 + LICENSE | 22 + pom.xml | 166 ++++++ src/main/config/consumer.properties | 50 ++ src/main/config/mmagent.config | 7 + src/main/config/mmagent.config_old | 5 + src/main/config/producer.properties | 74 +++ src/main/config/template.lrm.xml | 142 +++++ .../nsa/dmaapMMAgent/MirrorMakerAgent.java | 588 +++++++++++++++++++++ .../nsa/dmaapMMAgent/dao/CreateMirrorMaker.java | 45 ++ .../nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java | 44 ++ .../nsa/dmaapMMAgent/dao/ListMirrorMaker.java | 46 ++ .../nsa/dmaapMMAgent/dao/MirrorMaker.java | 72 +++ .../nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java | 44 ++ .../nsa/dmaapMMAgent/dao/UpdateWhiteList.java | 44 ++ .../utils/MirrorMakerProcessHandler.java | 154 ++++++ src/main/resources/log4j.properties | 37 ++ src/main/scripts/mmagent | 17 + .../mirroragent/nsa/dmaapMMAgent/AppTest.java | 60 +++ 19 files changed, 1618 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 pom.xml create mode 100644 src/main/config/consumer.properties create mode 100644 src/main/config/mmagent.config create mode 100644 src/main/config/mmagent.config_old create mode 100644 src/main/config/producer.properties create mode 100644 src/main/config/template.lrm.xml create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java create mode 100644 src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java create mode 100644 src/main/resources/log4j.properties create mode 100644 src/main/scripts/mmagent create mode 100644 src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b83d222 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2ce945c --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..1cb4120 --- /dev/null +++ b/pom.xml @@ -0,0 +1,166 @@ + + + 4.0.0 + + org.onap.dmaap.messagerouter.mirroragent + dmaapMMAgent + 1.0.0-SNAPSHOT + jar + + + org.onap.oparent + oparent + 1.0.0-SNAPSHOT + + + dmaapMMAgent + Mirror Maker Agent - Repliaction agent + https://github.com/att/dmaap-framework + + + UTF-8 + + + + Apache License Version 2.0 + + + + + Jackie + + ATT + www.att.com + + + + + + junit + junit + 3.8.1 + test + + + com.google.code.gson + gson + 2.6.2 + + + log4j + log4j + 1.2.17 + + + org.jasypt + jasypt + 1.9.2 + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.4 + + -Xdoclint:none + + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.0.0 + + + attach-sources + + jar-no-fork + + + + + + org.codehaus.mojo + cobertura-maven-plugin + 2.7 + + + html + xml + + + + + maven-assembly-plugin + 2.4 + + + jar-with-dependencies + + + + + true + org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent + + + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + + sign-artifacts + verify + + sign + + + + + + + + diff --git a/src/main/config/consumer.properties b/src/main/config/consumer.properties new file mode 100644 index 0000000..08d29af --- /dev/null +++ b/src/main/config/consumer.properties @@ -0,0 +1,50 @@ +############################################################################### +# ============LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +# Zookeeper connection string +# comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" +zookeeper.connect=172.18.0.1:2181 + +# timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=6000 + +#consumer group id +group.id=test-consumer-group + +#consumer timeout +#consumer.timeout.ms=5000 diff --git a/src/main/config/mmagent.config b/src/main/config/mmagent.config new file mode 100644 index 0000000..9e43eae --- /dev/null +++ b/src/main/config/mmagent.config @@ -0,0 +1,7 @@ +#kafkahome=C:/dev/att/kafka_2.10-0.8.2.1 +kafkahome=/opt/ +topicURL=http://172.18.0.1:3904 +#topicname=org.onap.dmaap.messagerouter.mirroragent.agenttest +topicname=org.openecomp.dmaapBC.mmatopic +mechid=dgl@openecomp.org +password=ecomp_admin \ No newline at end of file diff --git a/src/main/config/mmagent.config_old b/src/main/config/mmagent.config_old new file mode 100644 index 0000000..a0b80df --- /dev/null +++ b/src/main/config/mmagent.config_old @@ -0,0 +1,5 @@ +kafkahome=/opt/app/dmaap/msgrtr/kafka +topicURL=http://:3904 +topicname=org.onap.dmaap.messagerouter.mirroragent.agenttest +mechid= +password= \ No newline at end of file diff --git a/src/main/config/producer.properties b/src/main/config/producer.properties new file mode 100644 index 0000000..30df665 --- /dev/null +++ b/src/main/config/producer.properties @@ -0,0 +1,74 @@ +############################################################################### +# ============LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +############################# Producer Basics ############################# + +# list of brokers used for bootstrapping knowledge about the rest of the cluster +# format: host1:port1,host2:port2 ... +metadata.broker.list=104.130.132.211:9092 + +# name of the partitioner class for partitioning events; default partition spreads data randomly +#partitioner.class= + +# specifies whether the messages are sent asynchronously (async) or synchronously (sync) +producer.type=sync + +# specify the compression codec for all data generated: none, gzip, snappy, lz4. +# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively +compression.codec=none + +# message encoder +serializer.class=kafka.serializer.DefaultEncoder + +# allow topic level compression +#compressed.topics= + +############################# Async Producer ############################# +# maximum time, in milliseconds, for buffering data on the producer queue +#queue.buffering.max.ms= + +# the maximum size of the blocking queue for buffering on the producer +#queue.buffering.max.messages= + +# Timeout for event enqueue: +# 0: events will be enqueued immediately or dropped if the queue is full +# -ve: enqueue will block indefinitely if the queue is full +# +ve: enqueue will block up to this many milliseconds if the queue is full +#queue.enqueue.timeout.ms= + +# the number of messages batched at the producer +#batch.num.messages= diff --git a/src/main/config/template.lrm.xml b/src/main/config/template.lrm.xml new file mode 100644 index 0000000..a36cf2c --- /dev/null +++ b/src/main/config/template.lrm.xml @@ -0,0 +1,142 @@ + + + + + + + __SOA_CLOUD_NAMESPACE__.${artifactId} + + __MAJOR_VERSION__ + __MINOR_VERSION__ + __PATCH_VERSION__ + + + + Java + + ATT + /opt/app/dmaap/mmagent + + process.path + /usr/bin:/usr/sbin:${PATH} + + + process.workdir + /opt/app/dmaap/mmagent + + + process.libpath + ${LD_LIBRARY_PATH} + + + + + + + + jmx.port + __JMX_PORT_MRAGENT__ + + + + + + + jvm.version + __JAVA_VERSION__ + + + jvm.classpath + :.:${CLASSPATH}:/opt/app/dmaap/mmagent/etc:/opt/app/dmaap/mmagent/lib/*: + + + jvm.args.pre + __PRE_JVM_ARGS__ -XX:MaxPermSize=__MAX_PERM_SIZE__ + -XX:PermSize=__PERM_SIZE__ + __INTROSCOPE_VARS__ + -Djava.net.preferIPv4Stack=true + -DMMAGENTHOME=/opt/app/dmaap/mmagent + __POST_JVM_ARGS__ + __SCLD_OPTIONAL_PLATFORM_FLAG__ + -DMMAGENTHOME=/opt/app/dmaap/mmagent + + + + jvm.heap.min + __MIN_HEAP_SIZE__ + + + jvm.heap.max + __MAX_HEAP_SIZE__ + + + start.class + org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent + + + stdout.redirect + log/stdout.log + + + stderr.redirect + log/stdout.log + + + validatePID.waitime.seconds + __LRM_VALIDATEPID_WAITTIME_SECONDS__ + + + mbean.name + + JmxInterface:type=DME2 + + msgrtr + __LRM_RESOURCE_START_TYPE__ + __LRM_START_PRIORITY__ + __LRM_START_TIMEOUT__ + __RESOURCE_MIN_COUNT__ + __RESOURCE_MAX_COUNT__ + __LRM_RESOURCE_MAX_RESTART__ + __LRM_RESOURCE_HEARTBEAT__ + __LRM_RESOURCE_HEARTBEAT_FAILED_LIMIT__ + __LRM_RESOURCE_HEARTBEAT_TIMEOUT__ + __RESOURCE_MANAGER_WAIT_TIME_IN_SECONDS__ + __LRM_RESOURCE_REGISTRATION__ + dmaap + + + WARNING + __CLDLRM_WARNING_NOTIFY__ + + + SEVERE + __CLDLRM_SEVERE_NOTIFY__ + + + + diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java new file mode 100644 index 0000000..b6644b6 --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -0,0 +1,588 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.Properties; + +import org.apache.log4j.Logger; +import org.jasypt.util.text.BasicTextEncryptor; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.CreateMirrorMaker; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.DeleteMirrorMaker; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.ListMirrorMaker; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.MirrorMaker; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.UpdateMirrorMaker; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao.UpdateWhiteList; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler; + +import com.google.gson.Gson; +import com.google.gson.internal.LinkedTreeMap; +import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; +import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; + +public class MirrorMakerAgent { + static final Logger logger = Logger.getLogger(MirrorMakerAgent.class); + Properties mirrorMakerProperties = new Properties(); + ListMirrorMaker mirrorMakers = null; + String mmagenthome = ""; + String kafkahome = ""; + String topicURL = ""; + String topicname = ""; + String mechid = ""; + String password = ""; + private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; + + public static void main(String[] args) { + if (args != null && args.length == 2) { + if (args[0].equals("-encrypt")) { + BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); + textEncryptor.setPassword(secret); + String plainText = textEncryptor.encrypt(args[1]); + System.out.println("Encrypted Password is :" + plainText); + return; + } + } else if (args != null && args.length > 0) { + System.out.println( + "Usage: ./mmagent to run with the configuration \n -encrypt to Encrypt Password for config "); + return; + } + MirrorMakerAgent agent = new MirrorMakerAgent(); + if (agent.checkStartup()) { + logger.info("mmagent started, loading properties"); + agent.checkAgentProcess(); + agent.readAgentTopic(); + } else { + System.out.println( + "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic"); + } + } + + private boolean checkStartup() { + FileInputStream input = null; + try { + this.mmagenthome = System.getProperty("MMAGENTHOME"); + input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); + logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); + } catch (IOException ex) { + logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file"); + return false; + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + loadProperties(); + input = null; + try { + input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh"); + logger.info("kakahome is set :" + kafkahome); + } catch (IOException ex) { + logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly"); + return false; + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + String response = publishTopic("{\"test\":\"test\"}"); + if (response.startsWith("ERROR:")) { + logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:" + + this.topicURL + " Error is: " + response); + return false; + } + logger.info("Published to Topic :" + this.topicname + " Successfully"); + response = subscribeTopic("1"); + if (response != null && response.startsWith("ERROR:")) { + logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:" + + this.topicURL + " Error is: " + response); + return false; + } + logger.info("Subscribed to Topic :" + this.topicname + " Successfully"); + return true; + } + + private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) { + InputStream input = null; + OutputStream out = null; + try { + if (refresh) { + throw new IOException(); + } + input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + } catch (IOException ex) { + try { + input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties"); + Properties prop = new Properties(); + prop.load(input); + if (propName.equals("consumer")) { + prop.setProperty("group.id", agentName); + prop.setProperty("zookeeper.connect", info); + } else { + prop.setProperty("metadata.broker.list", info); + } + out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + prop.store(out, ""); + + } catch (Exception e) { + e.printStackTrace(); + } + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private void checkAgentProcess() { + logger.info("Checking MirrorMaker Process"); + if (mirrorMakers != null) { + int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); + for (int i = 0; i < mirrorMakersCount; i++) { + MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); + if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) { + checkPropertiesFile(mm.name, "consumer", mm.consumer, false); + checkPropertiesFile(mm.name, "producer", mm.producer, false); + + if (mm.whitelist != null && !mm.whitelist.equals("")) { + logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); + MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, + mmagenthome + "/etc/" + mm.name + "consumer.properties", + mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist); + mm.setStatus("RESTARTING"); + + } else { + logger.info("MirrorMaker " + mm.name + " is STOPPED"); + mm.setStatus("STOPPED"); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + mirrorMakers.getListMirrorMaker().set(i, mm); + } else { + logger.info("MirrorMaker " + mm.name + " is running"); + mm.setStatus("RUNNING"); + mirrorMakers.getListMirrorMaker().set(i, mm); + } + } + } + // Gson g = new Gson(); + // System.out.println(g.toJson(mirrorMakers)); + } + + private String subscribeTopic(String timeout) { + String response = ""; + try { + String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout + + "&limit=1"; + String authString = this.mechid + ":" + this.password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + + while ((line = in.readLine()) != null) { + response = response + line; + } + Gson g = new Gson(); + // get message as JSON String Array + String[] topicMessage = g.fromJson(response, String[].class); + if (topicMessage.length != 0) { + return topicMessage[0]; + } + } catch (Exception e) { + return "ERROR:" + e.getMessage() + " Server Response is:" + response; + } + return null; + } + + private String publishTopic(String message) { + try { + String requestURL = this.topicURL + "/events/" + this.topicname; + String authString = this.mechid + ":" + this.password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Content-Length", Integer.toString(message.length())); + DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); + wr.write(message.getBytes()); + + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + String response = ""; + while ((line = in.readLine()) != null) { + response = response + line; + } + return response; + + } catch (Exception e) { + return "ERROR:" + e.getLocalizedMessage(); + } + } + + private void readAgentTopic() { + try { + int connectionattempt = 0; + while (true) { + logger.info("--------------------------------"); + logger.info("Waiting for Messages for 60 secs"); + String topicMessage = subscribeTopic("60000"); + Gson g = new Gson(); + LinkedTreeMap object = null; + if (topicMessage != null) { + try { + object = g.fromJson(topicMessage, LinkedTreeMap.class); + + // Cast the 1st item (since limit=1 and see the type of + // object + if (object.get("createMirrorMaker") != null) { + logger.info("Received createMirrorMaker request from topic"); + CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); + createMirrorMaker(m.getCreateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateMirrorMaker") != null) { + logger.info("Received updateMirrorMaker request from topic"); + UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + updateMirrorMaker(m.getUpdateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("deleteMirrorMaker") != null) { + logger.info("Received deleteMirrorMaker request from topic"); + DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); + deleteMirrorMaker(m.getDeleteMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listAllMirrorMaker") != null) { + logger.info("Received listALLMirrorMaker request from topic"); + checkAgentProcess(); + mirrorMakers.setMessageID((String) object.get("messageID")); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateWhiteList") != null) { + logger.info("Received updateWhiteList request from topic"); + UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); + updateWhiteList(m.getUpdateWhiteList()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listMirrorMaker") != null) { + logger.info("Received listMirrorMaker from topic, skipping messages"); + } else { + logger.info("Received unknown request from topic"); + } + } catch (Exception ex) { + connectionattempt++; + if (connectionattempt > 5) { + logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage); + return; + } + logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt + + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage()); + Thread.sleep(60000); + } + } else { + // Check all MirrorMaker every min + connectionattempt = 0; + checkAgentProcess(); + } + + } + } catch (Exception e) { + e.printStackTrace(); + } + + } + + private void createMirrorMaker(MirrorMaker newMirrorMaker) { + boolean exists = false; + if (mirrorMakers != null) { + int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); + for (int i = 0; i < mirrorMakersCount; i++) { + MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); + if (mm.name.equals(newMirrorMaker.name)) { + exists = true; + logger.info("MirrorMaker already exist for:" + newMirrorMaker.name); + return; + } + } + } + logger.info("Adding new MirrorMaker:" + newMirrorMaker.name); + if (exists == false && mirrorMakers != null) { + mirrorMakers.getListMirrorMaker().add(newMirrorMaker); + } else if (exists == false && mirrorMakers == null) { + mirrorMakers = new ListMirrorMaker(); + ArrayList list = mirrorMakers.getListMirrorMaker(); + list = new ArrayList(); + list.add(newMirrorMaker); + mirrorMakers.setListMirrorMaker(list); + } + checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); + checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + + Gson g = new Gson(); + mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); + OutputStream out = null; + try { + out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); + mirrorMakerProperties.store(out, ""); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private void updateMirrorMaker(MirrorMaker newMirrorMaker) { + boolean exists = false; + if (mirrorMakers != null) { + int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); + for (int i = 0; i < mirrorMakersCount; i++) { + MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); + if (mm.name.equals(newMirrorMaker.name)) { + exists = true; + mm.setConsumer(newMirrorMaker.getConsumer()); + mm.setProducer(newMirrorMaker.getProducer()); + mirrorMakers.getListMirrorMaker().set(i, mm); + logger.info("Updating MirrorMaker:" + newMirrorMaker.name); + } + } + } + if (exists) { + checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); + checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + + Gson g = new Gson(); + mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); + OutputStream out = null; + try { + out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); + mirrorMakerProperties.store(out, ""); + MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } else { + logger.info("MirrorMaker Not found for:" + newMirrorMaker.name); + } + } + + private void updateWhiteList(MirrorMaker newMirrorMaker) { + boolean exists = false; + if (mirrorMakers != null) { + int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); + for (int i = 0; i < mirrorMakersCount; i++) { + MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); + if (mm.name.equals(newMirrorMaker.name)) { + exists = true; + mm.setWhitelist(newMirrorMaker.whitelist); + mirrorMakers.getListMirrorMaker().set(i, mm); + logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:" + + newMirrorMaker.whitelist); + } + } + } + if (exists) { + Gson g = new Gson(); + mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); + OutputStream out = null; + try { + out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); + mirrorMakerProperties.store(out, ""); + MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } else { + logger.info("MirrorMaker Not found for:" + newMirrorMaker.name); + } + } + + private void deleteMirrorMaker(MirrorMaker newMirrorMaker) { + boolean exists = false; + if (mirrorMakers != null) { + int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); + for (int i = 0; i < mirrorMakersCount; i++) { + MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); + if (mm.name.equals(newMirrorMaker.name)) { + exists = true; + mirrorMakers.getListMirrorMaker().remove(i); + logger.info("Removing MirrorMaker:" + newMirrorMaker.name); + i = mirrorMakersCount; + } + } + } + if (exists) { + try { + String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties"; + File file = new File(path); + file.delete(); + } catch (Exception ex) { + } + try { + String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties"; + File file = new File(path); + file.delete(); + } catch (Exception ex) { + } + Gson g = new Gson(); + mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); + OutputStream out = null; + try { + out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); + mirrorMakerProperties.store(out, ""); + MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } else { + logger.info("MirrorMaker Not found for:" + newMirrorMaker.name); + } + } + + private void loadProperties() { + InputStream input = null; + try { + + input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); + mirrorMakerProperties.load(input); + Gson g = new Gson(); + if (mirrorMakerProperties.getProperty("mirrormakers") == null) { + this.mirrorMakers = new ListMirrorMaker(); + ArrayList list = this.mirrorMakers.getListMirrorMaker(); + list = new ArrayList(); + this.mirrorMakers.setListMirrorMaker(list); + } else { + this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"), + ListMirrorMaker.class); + } + + this.kafkahome = mirrorMakerProperties.getProperty("kafkahome"); + this.topicURL = mirrorMakerProperties.getProperty("topicURL"); + this.topicname = mirrorMakerProperties.getProperty("topicname"); + this.mechid = mirrorMakerProperties.getProperty("mechid"); + + BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); + textEncryptor.setPassword(secret); + //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); + this.password = mirrorMakerProperties.getProperty("password"); + } catch (IOException ex) { + // ex.printStackTrace(); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + // e.printStackTrace(); + } + } + } + + } +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java new file mode 100644 index 0000000..bf1207a --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java @@ -0,0 +1,45 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +public class CreateMirrorMaker { + String messageID; + MirrorMaker createMirrorMaker; + + public MirrorMaker getCreateMirrorMaker() { + return createMirrorMaker; + } + + public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) { + this.createMirrorMaker = createMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } + +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java new file mode 100644 index 0000000..dcabff6 --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +public class DeleteMirrorMaker { + String messageID; + MirrorMaker deleteMirrorMaker; + + public MirrorMaker getDeleteMirrorMaker() { + return deleteMirrorMaker; + } + + public void setDeleteMirrorMaker(MirrorMaker deleteMirrorMaker) { + this.deleteMirrorMaker = deleteMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java new file mode 100644 index 0000000..56953be --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/ListMirrorMaker.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +import java.util.ArrayList; + +public class ListMirrorMaker { + String messageID; + ArrayList listMirrorMaker; + + public ArrayList getListMirrorMaker() { + return listMirrorMaker; + } + + public void setListMirrorMaker(ArrayList createMirrorMaker) { + this.listMirrorMaker = createMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java new file mode 100644 index 0000000..d496aea --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/MirrorMaker.java @@ -0,0 +1,72 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +public class MirrorMaker { + public String name; + public String consumer; + public String producer; + public String whitelist; + public String status; + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getConsumer() { + return consumer; + } + + public void setConsumer(String consumer) { + this.consumer = consumer; + } + + public String getProducer() { + return producer; + } + + public void setProducer(String producer) { + this.producer = producer; + } + + public String getWhitelist() { + return whitelist; + } + + public void setWhitelist(String whitelist) { + this.whitelist = whitelist; + } + +} \ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java new file mode 100644 index 0000000..d78054f --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +public class UpdateMirrorMaker { + String messageID; + MirrorMaker updateMirrorMaker; + + public MirrorMaker getUpdateMirrorMaker() { + return updateMirrorMaker; + } + + public void setUpdateMirrorMaker(MirrorMaker updateMirrorMaker) { + this.updateMirrorMaker = updateMirrorMaker; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java new file mode 100644 index 0000000..215fb34 --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/dao/UpdateWhiteList.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.dao; + +public class UpdateWhiteList { + String messageID; + MirrorMaker updateWhiteList; + + public MirrorMaker getUpdateWhiteList() { + return updateWhiteList; + } + + public void setUpdateWhiteList(MirrorMaker updateWhiteList) { + this.updateWhiteList = updateWhiteList; + } + + public String getMessageID() { + return messageID; + } + + public void setMessageID(String messageID) { + this.messageID = messageID; + } +} diff --git a/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java new file mode 100644 index 0000000..05c81be --- /dev/null +++ b/src/main/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java @@ -0,0 +1,154 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.utils; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.log4j.Logger; +import org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent.MirrorMakerAgent; + +public class MirrorMakerProcessHandler { + static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class); + + public static boolean checkMirrorMakerProcess(String agentname) { + try { + Runtime rt = Runtime.getRuntime(); + Process mmprocess = null; + + if (System.getProperty("os.name").contains("Windows")) { + String args = ""; + args = "wmic.exe process where \"commandline like '%agentname=" + agentname + + "~%' and caption='java.exe'\""; + mmprocess = rt.exec(args); + } else { + String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" }; + mmprocess = rt.exec(args); + } + + InputStream is = mmprocess.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line; + while ((line = br.readLine()) != null) { + // System.out.println(line); + if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) { + return true; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + return false; + } + + public static void stopMirrorMaker(String agentname) { + try { + Runtime rt = Runtime.getRuntime(); + Process killprocess = null; + + if (System.getProperty("os.name").contains("Windows")) { + String args = "wmic.exe process where \"commandline like '%agentname=" + agentname + + "~%' and caption='java.exe'\" call terminate"; + killprocess = rt.exec(args); + } else { + String args[] = { "/bin/sh", "-c", + "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" }; + // args = "kill $(ps -ef |grep java |grep agentname=" + + // agentname + "~| awk '{print $2}')"; + killprocess = rt.exec(args); + } + + InputStream is = killprocess.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line; + while ((line = br.readLine()) != null) { + // System.out.println(line); + } + + logger.info("Mirror Maker " + agentname + " Stopped"); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig, + String producerConfig, String whitelist) { + try { + Runtime rt = Runtime.getRuntime(); + + if (System.getProperty("os.name").contains("Windows")) { + String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName + + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config " + + producerConfig + " --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName + + "_MMaker.log"; + final Process process = rt.exec(args); + new Thread() { + public void run() { + try { + InputStream is = process.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line; + while ((line = br.readLine()) != null) { + // System.out.println(line); + } + } catch (Exception anExc) { + anExc.printStackTrace(); + } + } + }.start(); + } else { + String args[] = { "/bin/sh", "-c", + kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName + + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + + " --producer.config " + producerConfig + " --whitelist '" + whitelist + "' >" + + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" }; + final Process process = rt.exec(args); + new Thread() { + public void run() { + try { + InputStream is = process.getInputStream(); + InputStreamReader isr = new InputStreamReader(is); + BufferedReader br = new BufferedReader(isr); + String line; + while ((line = br.readLine()) != null) { + // System.out.println(line); + } + } catch (Exception anExc) { + anExc.printStackTrace(); + } + } + }.start(); + } + + logger.info("Mirror Maker " + agentName + " Started" + " WhiteListing:" + whitelist); + + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties new file mode 100644 index 0000000..a0ac24f --- /dev/null +++ b/src/main/resources/log4j.properties @@ -0,0 +1,37 @@ +############################################################################### +# ============LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +############################################################################### +# Root logger option +log4j.rootLogger=INFO, stdout, file + +# Redirect log messages to console +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m %n + +# Redirect log messages to a log file, support file rolling. +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File= ${MMAGENTHOME}/logs/mmagent.log +log4j.appender.file.MaxFileSize=5MB +log4j.appender.file.MaxBackupIndex=10 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m %n \ No newline at end of file diff --git a/src/main/scripts/mmagent b/src/main/scripts/mmagent new file mode 100644 index 0000000..18a75ea --- /dev/null +++ b/src/main/scripts/mmagent @@ -0,0 +1,17 @@ +#!/bin/sh + +JAVA_HOMES="${INSTALL_ROOT}/opt/app/java/jdk/jdk170 ${INSTALL_ROOT}/opt/app/java/jdk/jdk160" +for jhome in ${JAVA_HOMES}; do + if [ -x "${jhome}"/bin/java ]; then + export JAVA_HOME=${jhome} + fi +done + +ROOT_DIR=`dirname $0`/.. +ROOT_DIR=`cd $ROOT_DIR; pwd` +CLASSPATH=${ROOT_DIR}'/lib/*' +PATH=${JAVA_HOME}/bin:${PATH} +export JAVA_HOME CLASSPATH PATH + +exec java -DMMAGENTHOME=$ROOT_DIR com.att.nsa.dmaapMMAgent.MirrorMakerAgent "$@" + diff --git a/src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java b/src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java new file mode 100644 index 0000000..19a48b8 --- /dev/null +++ b/src/test/java/org/onap/dmaap/messagerouter/mirroragent/nsa/dmaapMMAgent/AppTest.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package org.onap.dmaap.messagerouter.mirroragent.nsa.dmaapMMAgent; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; + +/** + * Unit test for simple App. + */ +public class AppTest + extends TestCase +{ + /** + * Create the test case + * + * @param testName name of the test case + */ + public AppTest( String testName ) + { + super( testName ); + } + + /** + * @return the suite of tests being tested + */ + public static Test suite() + { + return new TestSuite( AppTest.class ); + } + + /** + * Rigourous Test :-) + */ + public void testApp() + { + assertTrue( true ); + } +} -- 2.16.6