1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.dmaapMMAgent;
25 import java.io.FileInputStream;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.util.ArrayList;
31 import java.util.Properties;
33 import org.apache.log4j.Logger;
34 import org.jasypt.util.text.BasicTextEncryptor;
35 import org.json.JSONObject;
36 import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
37 import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
38 import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
39 import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
40 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
41 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
42 import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
44 import com.google.gson.Gson;
45 import com.google.gson.internal.LinkedTreeMap;
47 public class MirrorMakerAgent {
48 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
49 Properties mirrorMakerProperties = new Properties();
50 ListMirrorMaker mirrorMakers = null;
51 String mmagenthome = "/opt";
52 String kafkahome = "";
54 String topicname = "";
58 public boolean exitLoop = false;
59 TopicUtil topicUtil = new TopicUtil();
60 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
62 public static void main(String[] args) {
63 if (args != null && args.length == 2) {
64 if (args[0].equals("-encrypt")) {
65 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
66 textEncryptor.setPassword(secret);
67 String plainText = textEncryptor.encrypt(args[1]);
68 System.out.println("Encrypted Password is :" + plainText);
71 } else if (args != null && args.length > 0) {
73 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
76 MirrorMakerAgent agent = new MirrorMakerAgent();
77 if (agent.checkStartup()) {
78 logger.info("mmagent started, loading properties");
80 agent.checkAgentProcess();
81 } catch (Exception e) {
82 logger.error("exception occured in checkAgentProcess ", e);
84 agent.readAgentTopic();
87 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
91 private boolean checkStartup() {
92 FileInputStream input = null;
94 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
95 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
96 } catch (IOException ex) {
97 logger.error(mmagenthome + "/etc/mmagent.config not found.", ex);
103 } catch (IOException e) {
104 logger.error("exception occured in checkStartup "+e);
111 input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
112 logger.info("kafkahome is set :" + kafkahome);
113 } catch (IOException ex) {
114 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly", ex);
120 } catch (IOException e) {
121 logger.error("exception occured in checkStartup "+e);
125 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
126 if (response.startsWith("ERROR:")) {
127 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
128 + this.topicURL + " Error is: " + response);
131 logger.info("Published to Topic :" + this.topicname + " Successfully");
132 response = topicUtil.subscribeTopic(topicURL, topicname, "1", mechid, password);
133 if (response != null && response.startsWith("ERROR:")) {
134 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
135 + this.topicURL + " Error is: " + response);
138 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
142 private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
143 InputStream input = null;
144 OutputStream out = null;
147 throw new IOException();
149 input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
150 } catch (IOException ex) {
152 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
153 Properties prop = new Properties();
155 if (propName.equals("consumer")) {
156 prop.setProperty("group.id", mm.name);
158 prop.setProperty("bootstrap.servers", mm.consumer);
159 prop.setProperty("client.id", mm.name + "MM_consumer");
161 prop.setProperty("bootstrap.servers", mm.producer);
162 prop.setProperty("client.id", mm.name + "MM_producer");
165 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
168 } catch (Exception e) {
169 logger.error("exception occured in checkPropertiesFile "+e);
175 } catch (IOException e) {
176 logger.error("exception occured in checkPropertiesFile ", e);
182 } catch (IOException e) {
183 logger.error("exception occured in checkPropertiesFile "+e);
189 private void checkAgentProcess() throws Exception {
190 logger.info("Checking MirrorMaker Process");
191 if (mirrorMakers != null) {
192 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
193 for (int i = 0; i < mirrorMakersCount; i++) {
194 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
195 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck,
196 this.grepLog) == false) {
197 checkPropertiesFile(mm, "consumer", false);
198 checkPropertiesFile(mm, "producer", false);
200 if (mm.whitelist != null && !mm.whitelist.equals("")) {
202 "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
203 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
204 mmagenthome + "/etc/" + mm.name + "consumer.properties",
205 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist);
206 mm.setStatus("RESTARTING");
209 logger.info("MirrorMaker " + mm.name + " is STOPPED");
210 mm.setStatus("STOPPED");
214 } catch (InterruptedException e) {
216 mirrorMakers.getListMirrorMaker().set(i, mm);
218 logger.info("MirrorMaker " + mm.name + " is running");
219 mm.setStatus("RUNNING");
220 mirrorMakers.getListMirrorMaker().set(i, mm);
224 // Gson g = new Gson();
225 // System.out.println(g.toJson(mirrorMakers));
228 public void readAgentTopic() {
230 int connectionattempt = 0;
232 logger.info("--------------------------------");
233 logger.info("Waiting for Messages for 60 secs");
234 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
236 LinkedTreeMap<?, ?> object = null;
237 if (topicMessage != null) {
239 // Check and parse if String object returned by consumer
241 // else use the jsonObject
242 if (topicMessage.startsWith("\"")) {
243 topicMessage = g.fromJson(topicMessage.toString(), String.class);
245 object = g.fromJson(topicMessage, LinkedTreeMap.class);
247 // Cast the 1st item (since limit=1 and see the type of
249 readAgent(object, topicMessage);
250 } catch (Exception ex) {
252 if (connectionattempt > 5) {
253 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
256 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
257 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
261 // Check all MirrorMaker every min
262 connectionattempt = 0;
270 } catch (Exception e) {
271 logger.error("exception occured in readAgentTopic ", e);
276 public void createMirrorMaker(MirrorMaker newMirrorMaker) {
277 boolean exists = false;
278 if (mirrorMakers != null) {
279 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
280 for (int i = 0; i < mirrorMakersCount; i++) {
281 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
282 if (mm.name.equals(newMirrorMaker.name)) {
284 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
289 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
290 if (exists == false && mirrorMakers != null) {
291 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
292 } else if (exists == false && mirrorMakers == null) {
293 mirrorMakers = new ListMirrorMaker();
294 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
295 list = new ArrayList<MirrorMaker>();
296 list.add(newMirrorMaker);
297 mirrorMakers.setListMirrorMaker(list);
299 checkPropertiesFile(newMirrorMaker, "consumer", true);
300 checkPropertiesFile(newMirrorMaker, "producer", true);
303 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
304 OutputStream out = null;
306 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
307 mirrorMakerProperties.store(out, "");
308 } catch (IOException ex) {
309 logger.error("exception occured in createMirrorMaker ", ex);
314 } catch (IOException e) {
315 logger.error("exception occured in createMirrorMaker ", e);
321 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
322 boolean exists = false;
323 if (mirrorMakers != null) {
324 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
325 for (int i = 0; i < mirrorMakersCount; i++) {
326 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
327 if (mm.name.equals(newMirrorMaker.name)) {
329 if (null != newMirrorMaker.getConsumer()) {
330 mm.setConsumer(newMirrorMaker.getConsumer());
332 if (null != newMirrorMaker.getProducer()) {
333 mm.setProducer(newMirrorMaker.getProducer());
335 if (newMirrorMaker.getNumStreams() >= 1) {
336 mm.setNumStreams(newMirrorMaker.getNumStreams());
339 mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
341 mirrorMakers.getListMirrorMaker().set(i, mm);
343 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
348 checkPropertiesFile(newMirrorMaker, "consumer", true);
349 checkPropertiesFile(newMirrorMaker, "producer", true);
352 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
353 OutputStream out = null;
355 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
356 mirrorMakerProperties.store(out, "");
357 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
360 } catch (InterruptedException e) {
362 } catch (IOException ex) {
363 logger.error("exception occured in updateMirrorMaker ", ex);
368 } catch (IOException e) {
369 logger.error("exception occured in updateMirrorMaker ", e);
374 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
378 private void updateWhiteList(MirrorMaker newMirrorMaker) {
379 boolean exists = false;
380 if (mirrorMakers != null) {
381 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
382 for (int i = 0; i < mirrorMakersCount; i++) {
383 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
384 if (mm.name.equals(newMirrorMaker.name)) {
386 mm.setWhitelist(newMirrorMaker.whitelist);
387 mirrorMakers.getListMirrorMaker().set(i, mm);
388 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
389 + newMirrorMaker.whitelist);
395 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
396 OutputStream out = null;
398 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
399 mirrorMakerProperties.store(out, "");
400 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
403 } catch (InterruptedException e) {
405 } catch (IOException ex) {
406 logger.error("exception occured in updateWhiteList ", ex);
411 } catch (IOException e) {
412 logger.error("exception occured in updateWhiteList ", e);
417 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
421 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
422 boolean exists = false;
423 if (mirrorMakers != null) {
424 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
425 for (int i = 0; i < mirrorMakersCount; i++) {
426 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
427 if (mm.name.equals(newMirrorMaker.name)) {
429 mirrorMakers.getListMirrorMaker().remove(i);
430 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
431 i = mirrorMakersCount;
437 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
438 File file = new File(path);
440 } catch (Exception ex) {
441 logger.error("exception occured in deleteMirrorMaker ", ex);
444 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
445 File file = new File(path);
447 } catch (Exception ex) {
448 logger.error("exception occured in deleteMirrorMaker ", ex);
451 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
452 OutputStream out = null;
454 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
455 mirrorMakerProperties.store(out, "");
456 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
457 } catch (IOException ex) {
458 logger.error("exception occured in deleteMirrorMaker ", ex);
463 } catch (IOException e) {
464 logger.error("exception occured in deleteMirrorMaker ", e);
469 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
473 private void loadProperties() {
474 InputStream input = null;
477 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
478 mirrorMakerProperties.load(input);
480 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
481 this.mirrorMakers = new ListMirrorMaker();
482 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
483 list = new ArrayList<>();
484 this.mirrorMakers.setListMirrorMaker(list);
486 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
487 ListMirrorMaker.class);
490 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
491 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
492 this.topicname = mirrorMakerProperties.getProperty("topicname");
493 this.mechid = mirrorMakerProperties.getProperty("mechid");
494 this.grepLog = mirrorMakerProperties.getProperty("grepLog");
496 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
497 textEncryptor.setPassword(secret);
498 this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
499 } catch (Exception ex) {
500 logger.error("exception occured in loadProperties ", ex);
505 } catch (IOException e) {
506 logger.error("exception occured in loadProperties ", e);
513 public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) throws Exception{
517 if (object.get("createMirrorMaker") != null) {
518 logger.info("Received createMirrorMaker request from topic");
519 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
520 createMirrorMaker(m.getCreateMirrorMaker());
522 mirrorMakers.setMessageID(m.getMessageID());
523 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
524 mirrorMakers.setMessageID("");
525 } else if (object.get("updateMirrorMaker") != null) {
526 logger.info("Received updateMirrorMaker request from topic");
527 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
528 JSONObject json = new JSONObject(topicMessage);
529 JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
530 if (!json2.has("numStreams")) {
531 m.getUpdateMirrorMaker().setNumStreams(0);
533 updateMirrorMaker(m.getUpdateMirrorMaker());
535 mirrorMakers.setMessageID(m.getMessageID());
536 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
537 mirrorMakers.setMessageID("");
538 } else if (object.get("deleteMirrorMaker") != null) {
539 logger.info("Received deleteMirrorMaker request from topic");
540 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
541 deleteMirrorMaker(m.getDeleteMirrorMaker());
543 mirrorMakers.setMessageID(m.getMessageID());
544 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
545 mirrorMakers.setMessageID("");
546 } else if (object.get("listAllMirrorMaker") != null) {
547 logger.info("Received listALLMirrorMaker request from topic");
549 mirrorMakers.setMessageID((String) object.get("messageID"));
550 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
551 mirrorMakers.setMessageID("");
552 } else if (object.get("updateWhiteList") != null) {
553 logger.info("Received updateWhiteList request from topic");
554 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
555 updateWhiteList(m.getUpdateWhiteList());
557 mirrorMakers.setMessageID(m.getMessageID());
558 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
559 mirrorMakers.setMessageID("");
560 } else if (object.get("listMirrorMaker") != null) {
561 logger.info("Received listMirrorMaker from topic, skipping messages");
563 logger.info("Received unknown request from topic");