1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.mr.dmaapMMAgent;
24 import java.io.BufferedReader;
25 import java.io.DataOutputStream;
27 import java.io.FileInputStream;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InputStreamReader;
32 import java.io.OutputStream;
33 import java.net.HttpURLConnection;
35 import java.util.ArrayList;
36 import java.util.Properties;
37 import org.json.JSONObject;
38 import org.apache.log4j.Logger;
39 import org.jasypt.util.text.BasicTextEncryptor;
41 import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
42 import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
43 import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
44 import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
45 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
46 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
47 import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
48 import com.google.gson.Gson;
49 import com.google.gson.JsonArray;
50 import com.google.gson.internal.LinkedTreeMap;
52 import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
53 import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
55 public class MirrorMakerAgent {
56 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
57 Properties mirrorMakerProperties = new Properties();
58 ListMirrorMaker mirrorMakers = null;
59 String mmagenthome = "/opt";
60 String kafkahome = "";
62 String topicname = "";
66 public boolean exitLoop = false;
67 TopicUtil topicUtil = new TopicUtil();
68 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
70 public static void main(String[] args) {
71 if (args != null && args.length == 2) {
72 if (args[0].equals("-encrypt")) {
73 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
74 textEncryptor.setPassword(secret);
75 String plainText = textEncryptor.encrypt(args[1]);
76 System.out.println("Encrypted Password is :" + plainText);
79 } else if (args != null && args.length > 0) {
81 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
84 MirrorMakerAgent agent = new MirrorMakerAgent();
85 if (agent.checkStartup()) {
86 logger.info("mmagent started, loading properties");
88 agent.checkAgentProcess();
89 } catch (Exception e) {
93 agent.readAgentTopic();
96 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
100 private boolean checkStartup() {
101 FileInputStream input = null;
103 //this.mmagenthome = System.getProperty("MMAGENTHOME");
104 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
105 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
106 } catch (IOException ex) {
107 logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file");
113 } catch (IOException e) {
114 logger.error("exception occured in checkStartup "+e);
121 input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
122 logger.info("kakahome is set :" + kafkahome);
123 } catch (IOException ex) {
124 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly");
130 } catch (IOException e) {
131 logger.error("exception occured in checkStartup "+e);
135 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
136 if (response.startsWith("ERROR:")) {
137 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
138 + this.topicURL + " Error is: " + response);
141 logger.info("Published to Topic :" + this.topicname + " Successfully");
142 response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response);
143 if (response != null && response.startsWith("ERROR:")) {
144 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
145 + this.topicURL + " Error is: " + response);
148 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
152 private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
153 InputStream input = null;
154 OutputStream out = null;
157 throw new IOException();
159 input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
160 } catch (IOException ex) {
162 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
163 Properties prop = new Properties();
165 if (propName.equals("consumer")) {
166 prop.setProperty("group.id", mm.name);
168 prop.setProperty("bootstrap.servers", mm.consumer);
169 prop.setProperty("client.id", mm.name + "MM_consumer");
171 prop.setProperty("bootstrap.servers", mm.producer);
172 prop.setProperty("client.id", mm.name + "MM_producer");
175 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
178 } catch (Exception e) {
179 logger.error("exception occured in checkPropertiesFile "+e);
185 } catch (IOException e) {
192 } catch (IOException e) {
193 logger.error("exception occured in checkPropertiesFile "+e);
199 private void checkAgentProcess() throws Exception {
200 logger.info("Checking MirrorMaker Process");
201 if (mirrorMakers != null) {
202 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
203 for (int i = 0; i < mirrorMakersCount; i++) {
204 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
205 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck,
206 this.grepLog) == false) {
207 checkPropertiesFile(mm, "consumer", false);
208 checkPropertiesFile(mm, "producer", false);
210 if (mm.whitelist != null && !mm.whitelist.equals("")) {
212 "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
213 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
214 mmagenthome + "/etc/" + mm.name + "consumer.properties",
215 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist);
216 mm.setStatus("RESTARTING");
219 logger.info("MirrorMaker " + mm.name + " is STOPPED");
220 mm.setStatus("STOPPED");
224 } catch (InterruptedException e) {
226 mirrorMakers.getListMirrorMaker().set(i, mm);
228 logger.info("MirrorMaker " + mm.name + " is running");
229 mm.setStatus("RUNNING");
230 mirrorMakers.getListMirrorMaker().set(i, mm);
234 // Gson g = new Gson();
235 // System.out.println(g.toJson(mirrorMakers));
238 public void readAgentTopic() {
240 int connectionattempt = 0;
242 logger.info("--------------------------------");
243 logger.info("Waiting for Messages for 60 secs");
244 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
246 LinkedTreeMap<?, ?> object = null;
247 if (topicMessage != null) {
249 // Check and parse if String object returned by consumer
251 // else use the jsonObject
252 if (topicMessage.startsWith("\"")) {
253 topicMessage = g.fromJson(topicMessage.toString(), String.class);
255 object = g.fromJson(topicMessage, LinkedTreeMap.class);
257 // Cast the 1st item (since limit=1 and see the type of
259 readAgent(object, topicMessage);
260 } catch (Exception ex) {
262 if (connectionattempt > 5) {
263 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
266 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
267 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
271 // Check all MirrorMaker every min
272 connectionattempt = 0;
280 } catch (Exception e) {
286 public void createMirrorMaker(MirrorMaker newMirrorMaker) {
287 boolean exists = false;
288 if (mirrorMakers != null) {
289 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
290 for (int i = 0; i < mirrorMakersCount; i++) {
291 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
292 if (mm.name.equals(newMirrorMaker.name)) {
294 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
299 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
300 if (exists == false && mirrorMakers != null) {
301 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
302 } else if (exists == false && mirrorMakers == null) {
303 mirrorMakers = new ListMirrorMaker();
304 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
305 list = new ArrayList<MirrorMaker>();
306 list.add(newMirrorMaker);
307 mirrorMakers.setListMirrorMaker(list);
309 checkPropertiesFile(newMirrorMaker, "consumer", true);
310 checkPropertiesFile(newMirrorMaker, "producer", true);
313 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
314 OutputStream out = null;
316 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
317 mirrorMakerProperties.store(out, "");
318 } catch (IOException ex) {
319 ex.printStackTrace();
324 } catch (IOException e) {
331 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
332 boolean exists = false;
333 if (mirrorMakers != null) {
334 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
335 for (int i = 0; i < mirrorMakersCount; i++) {
336 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
337 if (mm.name.equals(newMirrorMaker.name)) {
339 if (null != newMirrorMaker.getConsumer()) {
340 mm.setConsumer(newMirrorMaker.getConsumer());
342 if (null != newMirrorMaker.getProducer()) {
343 mm.setProducer(newMirrorMaker.getProducer());
345 if (newMirrorMaker.getNumStreams() >= 1) {
346 mm.setNumStreams(newMirrorMaker.getNumStreams());
349 mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
351 mirrorMakers.getListMirrorMaker().set(i, mm);
353 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
358 checkPropertiesFile(newMirrorMaker, "consumer", true);
359 checkPropertiesFile(newMirrorMaker, "producer", true);
362 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
363 OutputStream out = null;
365 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
366 mirrorMakerProperties.store(out, "");
367 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
370 } catch (InterruptedException e) {
372 } catch (IOException ex) {
373 ex.printStackTrace();
378 } catch (IOException e) {
384 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
388 private void updateWhiteList(MirrorMaker newMirrorMaker) {
389 boolean exists = false;
390 if (mirrorMakers != null) {
391 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
392 for (int i = 0; i < mirrorMakersCount; i++) {
393 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
394 if (mm.name.equals(newMirrorMaker.name)) {
396 mm.setWhitelist(newMirrorMaker.whitelist);
397 mirrorMakers.getListMirrorMaker().set(i, mm);
398 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
399 + newMirrorMaker.whitelist);
405 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
406 OutputStream out = null;
408 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
409 mirrorMakerProperties.store(out, "");
410 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
413 } catch (InterruptedException e) {
415 } catch (IOException ex) {
416 ex.printStackTrace();
421 } catch (IOException e) {
427 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
431 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
432 boolean exists = false;
433 if (mirrorMakers != null) {
434 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
435 for (int i = 0; i < mirrorMakersCount; i++) {
436 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
437 if (mm.name.equals(newMirrorMaker.name)) {
439 mirrorMakers.getListMirrorMaker().remove(i);
440 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
441 i = mirrorMakersCount;
447 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
448 File file = new File(path);
450 } catch (Exception ex) {
453 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
454 File file = new File(path);
456 } catch (Exception ex) {
459 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
460 OutputStream out = null;
462 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
463 mirrorMakerProperties.store(out, "");
464 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
465 } catch (IOException ex) {
466 ex.printStackTrace();
471 } catch (IOException e) {
477 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
481 private void loadProperties() {
482 InputStream input = null;
485 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
486 mirrorMakerProperties.load(input);
488 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
489 this.mirrorMakers = new ListMirrorMaker();
490 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
491 list = new ArrayList<MirrorMaker>();
492 this.mirrorMakers.setListMirrorMaker(list);
494 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
495 ListMirrorMaker.class);
498 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
499 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
500 this.topicname = mirrorMakerProperties.getProperty("topicname");
501 this.mechid = mirrorMakerProperties.getProperty("mechid");
502 this.grepLog = mirrorMakerProperties.getProperty("grepLog");
504 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
505 textEncryptor.setPassword(secret);
506 this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
507 } catch (Exception ex) {
508 // ex.printStackTrace();
513 } catch (IOException e) {
514 // e.printStackTrace();
521 public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) throws Exception{
525 if (object.get("createMirrorMaker") != null) {
526 logger.info("Received createMirrorMaker request from topic");
527 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
528 createMirrorMaker(m.getCreateMirrorMaker());
530 mirrorMakers.setMessageID(m.getMessageID());
531 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
532 mirrorMakers.setMessageID("");
533 } else if (object.get("updateMirrorMaker") != null) {
534 logger.info("Received updateMirrorMaker request from topic");
535 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
536 JSONObject json = new JSONObject(topicMessage);
537 JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
538 if (!json2.has("numStreams")) {
539 m.getUpdateMirrorMaker().setNumStreams(0);
541 updateMirrorMaker(m.getUpdateMirrorMaker());
543 mirrorMakers.setMessageID(m.getMessageID());
544 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
545 mirrorMakers.setMessageID("");
546 } else if (object.get("deleteMirrorMaker") != null) {
547 logger.info("Received deleteMirrorMaker request from topic");
548 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
549 deleteMirrorMaker(m.getDeleteMirrorMaker());
551 mirrorMakers.setMessageID(m.getMessageID());
552 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
553 mirrorMakers.setMessageID("");
554 } else if (object.get("listAllMirrorMaker") != null) {
555 logger.info("Received listALLMirrorMaker request from topic");
557 mirrorMakers.setMessageID((String) object.get("messageID"));
558 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
559 mirrorMakers.setMessageID("");
560 } else if (object.get("updateWhiteList") != null) {
561 logger.info("Received updateWhiteList request from topic");
562 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
563 updateWhiteList(m.getUpdateWhiteList());
565 mirrorMakers.setMessageID(m.getMessageID());
566 topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
567 mirrorMakers.setMessageID("");
568 } else if (object.get("listMirrorMaker") != null) {
569 logger.info("Received listMirrorMaker from topic, skipping messages");
571 logger.info("Received unknown request from topic");