1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.dmaapMMAgent;
24 import com.google.gson.Gson;
25 import com.google.gson.internal.LinkedTreeMap;
27 import java.io.FileInputStream;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.OutputStream;
32 import java.util.ArrayList;
33 import java.util.Properties;
34 import org.apache.logging.log4j.LogManager;
35 import org.apache.logging.log4j.Logger;
36 import org.jasypt.util.text.BasicTextEncryptor;
37 import org.json.JSONObject;
38 import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
39 import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
40 import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
41 import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
42 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
43 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
44 import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
46 public class MirrorMakerAgent {
47 private static final Logger logger = LogManager.getLogger(MirrorMakerAgent.class);
48 Properties mirrorMakerProperties = new Properties();
49 ListMirrorMaker mirrorMakers = null;
50 String mmagenthome = "/opt";
51 String kafkahome = "";
53 String topicname = "";
57 public boolean exitLoop = false;
58 TopicUtil topicUtil = new TopicUtil();
59 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
61 public static void main(String[] args) {
62 if (args != null && args.length == 2) {
63 if (args[0].equals("-encrypt")) {
64 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
65 textEncryptor.setPassword(secret);
66 String plainText = textEncryptor.encrypt(args[1]);
67 System.out.println("Encrypted Password is :" + plainText);
70 } else if (args != null && args.length > 0) {
72 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
75 MirrorMakerAgent agent = new MirrorMakerAgent();
76 if (agent.checkStartup()) {
77 logger.info("mmagent started, loading properties");
79 agent.checkAgentProcess();
80 } catch (Exception e) {
81 logger.error("exception occured in checkAgentProcess ", e);
83 agent.readAgentTopic();
86 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
90 private boolean checkStartup() {
91 FileInputStream input = null;
93 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
94 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
95 } catch (IOException ex) {
96 logger.error(mmagenthome + "/etc/mmagent.config not found.", ex);
102 } catch (IOException e) {
103 logger.error("exception occured in checkStartup "+e);
110 input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
111 logger.info("kafkahome is set :" + kafkahome);
112 } catch (IOException ex) {
113 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly", ex);
119 } catch (IOException e) {
120 logger.error("exception occured in checkStartup "+e);
124 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
125 if (response.startsWith("ERROR:")) {
126 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
127 + this.topicURL + " Error is: " + response);
130 logger.info("Published to Topic :" + this.topicname + " Successfully");
131 response = topicUtil.subscribeTopic(topicURL, topicname, "1", mechid, password);
132 if (response != null && response.startsWith("ERROR:")) {
133 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
134 + this.topicURL + " Error is: " + response);
137 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
141 private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
142 InputStream input = null;
143 OutputStream out = null;
146 throw new IOException();
148 input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
149 } catch (IOException ex) {
151 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
152 Properties prop = new Properties();
154 if (propName.equals("consumer")) {
155 prop.setProperty("group.id", mm.name);
157 prop.setProperty("bootstrap.servers", mm.consumer);
158 prop.setProperty("client.id", mm.name + "MM_consumer");
160 prop.setProperty("bootstrap.servers", mm.producer);
161 prop.setProperty("client.id", mm.name + "MM_producer");
164 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
167 } catch (Exception e) {
168 logger.error("exception occured in checkPropertiesFile "+e);
174 } catch (IOException e) {
175 logger.error("exception occured in checkPropertiesFile ", e);
181 } catch (IOException e) {
182 logger.error("exception occured in checkPropertiesFile "+e);
188 private void checkAgentProcess() throws Exception {
189 logger.info("Checking MirrorMaker Process");
190 if (mirrorMakers != null) {
191 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
192 for (int i = 0; i < mirrorMakersCount; i++) {
193 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
194 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck,
195 this.grepLog) == false) {
196 checkPropertiesFile(mm, "consumer", false);
197 checkPropertiesFile(mm, "producer", false);
199 if (mm.whitelist != null && !mm.whitelist.equals("")) {
201 "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
202 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
203 mmagenthome + "/etc/" + mm.name + "consumer.properties",
204 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist);
205 mm.setStatus("RESTARTING");
208 logger.info("MirrorMaker " + mm.name + " is STOPPED");
209 mm.setStatus("STOPPED");
213 } catch (InterruptedException e) {
215 mirrorMakers.getListMirrorMaker().set(i, mm);
217 logger.info("MirrorMaker " + mm.name + " is running");
218 mm.setStatus("RUNNING");
219 mirrorMakers.getListMirrorMaker().set(i, mm);
223 // Gson g = new Gson();
224 // System.out.println(g.toJson(mirrorMakers));
227 public void readAgentTopic() {
229 int connectionattempt = 0;
231 logger.info("--------------------------------");
232 logger.info("Waiting for Messages for 60 secs");
233 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
235 LinkedTreeMap<?, ?> object = null;
236 if (topicMessage != null) {
238 // Check and parse if String object returned by consumer
240 // else use the jsonObject
241 if (topicMessage.startsWith("\"")) {
242 topicMessage = g.fromJson(topicMessage.toString(), String.class);
244 object = g.fromJson(topicMessage, LinkedTreeMap.class);
246 // Cast the 1st item (since limit=1 and see the type of
248 readAgent(object, topicMessage);
249 } catch (Exception ex) {
251 if (connectionattempt > 5) {
252 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
255 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
256 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
260 // Check all MirrorMaker every min
261 connectionattempt = 0;
269 } catch (Exception e) {
270 logger.error("exception occured in readAgentTopic ", e);
275 public void createMirrorMaker(MirrorMaker newMirrorMaker) {
276 boolean exists = false;
277 if (mirrorMakers != null) {
278 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
279 for (int i = 0; i < mirrorMakersCount; i++) {
280 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
281 if (mm.name.equals(newMirrorMaker.name)) {
283 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
288 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
289 if (exists == false && mirrorMakers != null) {
290 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
291 } else if (exists == false && mirrorMakers == null) {
292 mirrorMakers = new ListMirrorMaker();
293 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
294 list = new ArrayList<MirrorMaker>();
295 list.add(newMirrorMaker);
296 mirrorMakers.setListMirrorMaker(list);
298 checkPropertiesFile(newMirrorMaker, "consumer", true);
299 checkPropertiesFile(newMirrorMaker, "producer", true);
302 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
303 OutputStream out = null;
305 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
306 mirrorMakerProperties.store(out, "");
307 } catch (IOException ex) {
308 logger.error("exception occured in createMirrorMaker ", ex);
313 } catch (IOException e) {
314 logger.error("exception occured in createMirrorMaker ", e);
320 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
321 boolean exists = false;
322 if (mirrorMakers != null) {
323 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
324 for (int i = 0; i < mirrorMakersCount; i++) {
325 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
326 if (mm.name.equals(newMirrorMaker.name)) {
328 if (null != newMirrorMaker.getConsumer()) {
329 mm.setConsumer(newMirrorMaker.getConsumer());
331 if (null != newMirrorMaker.getProducer()) {
332 mm.setProducer(newMirrorMaker.getProducer());
334 if (newMirrorMaker.getNumStreams() >= 1) {
335 mm.setNumStreams(newMirrorMaker.getNumStreams());
338 mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
340 mirrorMakers.getListMirrorMaker().set(i, mm);
342 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
347 checkPropertiesFile(newMirrorMaker, "consumer", true);
348 checkPropertiesFile(newMirrorMaker, "producer", true);
351 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
352 OutputStream out = null;
354 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
355 mirrorMakerProperties.store(out, "");
356 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
359 } catch (InterruptedException e) {
361 } catch (IOException ex) {
362 logger.error("exception occured in updateMirrorMaker ", ex);
367 } catch (IOException e) {
368 logger.error("exception occured in updateMirrorMaker ", e);
373 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
377 private void updateWhiteList(MirrorMaker newMirrorMaker) {
378 boolean exists = false;
379 if (mirrorMakers != null) {
380 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
381 for (int i = 0; i < mirrorMakersCount; i++) {
382 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
383 if (mm.name.equals(newMirrorMaker.name)) {
385 mm.setWhitelist(newMirrorMaker.whitelist);
386 mirrorMakers.getListMirrorMaker().set(i, mm);
387 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
388 + newMirrorMaker.whitelist);
394 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
395 OutputStream out = null;
397 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
398 mirrorMakerProperties.store(out, "");
399 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
402 } catch (InterruptedException e) {
404 } catch (IOException ex) {
405 logger.error("exception occured in updateWhiteList ", ex);
410 } catch (IOException e) {
411 logger.error("exception occured in updateWhiteList ", e);
416 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
420 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
421 boolean exists = false;
422 if (mirrorMakers != null) {
423 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
424 for (int i = 0; i < mirrorMakersCount; i++) {
425 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
426 if (mm.name.equals(newMirrorMaker.name)) {
428 mirrorMakers.getListMirrorMaker().remove(i);
429 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
430 i = mirrorMakersCount;
436 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
437 File file = new File(path);
439 } catch (Exception ex) {
440 logger.error("exception occured in deleteMirrorMaker ", ex);
443 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
444 File file = new File(path);
446 } catch (Exception ex) {
447 logger.error("exception occured in deleteMirrorMaker ", ex);
450 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
451 OutputStream out = null;
453 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
454 mirrorMakerProperties.store(out, "");
455 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
456 } catch (IOException ex) {
457 logger.error("exception occured in deleteMirrorMaker ", ex);
462 } catch (IOException e) {
463 logger.error("exception occured in deleteMirrorMaker ", e);
468 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
472 private void loadProperties() {
473 InputStream input = null;
476 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
477 mirrorMakerProperties.load(input);
479 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
480 this.mirrorMakers = new ListMirrorMaker();
481 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
482 list = new ArrayList<>();
483 this.mirrorMakers.setListMirrorMaker(list);
485 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
486 ListMirrorMaker.class);
489 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
490 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
491 this.topicname = mirrorMakerProperties.getProperty("topicname");
492 this.mechid = mirrorMakerProperties.getProperty("mechid");
493 this.grepLog = mirrorMakerProperties.getProperty("grepLog");
495 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
496 textEncryptor.setPassword(secret);
497 this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
498 } catch (Exception ex) {
499 logger.error("exception occured in loadProperties ", ex);
504 } catch (IOException e) {
505 logger.error("exception occured in loadProperties ", e);
512 public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) throws Exception{
516 if (object.get("createMirrorMaker") != null) {
517 logger.info("Received createMirrorMaker request from topic");
518 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
519 createMirrorMaker(m.getCreateMirrorMaker());
521 mirrorMakers.setMessageID(m.getMessageID());
522 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
523 mirrorMakers.setMessageID("");
524 } else if (object.get("updateMirrorMaker") != null) {
525 logger.info("Received updateMirrorMaker request from topic");
526 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
527 JSONObject json = new JSONObject(topicMessage);
528 JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
529 if (!json2.has("numStreams")) {
530 m.getUpdateMirrorMaker().setNumStreams(0);
532 updateMirrorMaker(m.getUpdateMirrorMaker());
534 mirrorMakers.setMessageID(m.getMessageID());
535 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
536 mirrorMakers.setMessageID("");
537 } else if (object.get("deleteMirrorMaker") != null) {
538 logger.info("Received deleteMirrorMaker request from topic");
539 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
540 deleteMirrorMaker(m.getDeleteMirrorMaker());
542 mirrorMakers.setMessageID(m.getMessageID());
543 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
544 mirrorMakers.setMessageID("");
545 } else if (object.get("listAllMirrorMaker") != null) {
546 logger.info("Received listALLMirrorMaker request from topic");
548 mirrorMakers.setMessageID((String) object.get("messageID"));
549 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
550 mirrorMakers.setMessageID("");
551 } else if (object.get("updateWhiteList") != null) {
552 logger.info("Received updateWhiteList request from topic");
553 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
554 updateWhiteList(m.getUpdateWhiteList());
556 mirrorMakers.setMessageID(m.getMessageID());
557 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
558 mirrorMakers.setMessageID("");
559 } else if (object.get("listMirrorMaker") != null) {
560 logger.info("Received listMirrorMaker from topic, skipping messages");
562 logger.info("Received unknown request from topic");