1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package com.att.nsa.dmaapMMAgent;
26 import java.io.FileInputStream;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Properties;
34 import org.apache.log4j.Level;
35 import org.apache.log4j.Logger;
36 import org.jasypt.util.text.BasicTextEncryptor;
38 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
39 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
40 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
41 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
42 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
43 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
44 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
45 import com.google.gson.Gson;
46 import com.google.gson.internal.LinkedTreeMap;
48 public class MirrorMakerAgent {
49 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
50 Properties mirrorMakerProperties = new Properties();
51 ListMirrorMaker mirrorMakers = null;
52 String mmagenthome = "";
53 String kafkahome = "";
55 String topicname = "";
58 TopicUtil topicUtil = new TopicUtil();
59 public boolean exitLoop = false;
60 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
62 public static void main(String[] args) {
63 if (args != null && args.length == 2) {
64 if (args[0].equals("-encrypt")) {
65 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
66 textEncryptor.setPassword(secret);
67 String plainText = textEncryptor.encrypt(args[1]);
68 System.out.println("Encrypted Password is :" + plainText);
71 } else if (args != null && args.length > 0) {
73 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
76 MirrorMakerAgent agent = new MirrorMakerAgent();
77 if (agent.checkStartup()) {
78 logger.info("mmagent started, loading properties");
79 agent.checkAgentProcess();
80 agent.readAgentTopic();
83 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
87 private boolean checkStartup() {
88 FileInputStream input = null;
90 this.mmagenthome = System.getProperty("MMAGENTHOME");
91 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
92 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
93 } catch (IOException ex) {
95 mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex);
101 } catch (IOException e) {
102 logger.error(" IOException occers " + e);
110 * input = new FileInputStream(kafkahome +
111 * "/bin/kafka-run-class.sh");
114 throw new IOException();
116 logger.info("kakahome is set :" + kafkahome);
117 } catch (IOException ex) {
118 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex);
124 } catch (IOException e) {
125 logger.error("IOException" + e);
129 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
130 if (response.startsWith("ERROR:")) {
131 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
132 + this.topicURL + " Error is: " + response);
135 logger.info("Published to Topic :" + this.topicname + " Successfully");
136 response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response);
137 if (response != null && response.startsWith("ERROR:")) {
138 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
139 + this.topicURL + " Error is: " + response);
142 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
146 private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
147 InputStream input = null;
148 OutputStream out = null;
151 throw new IOException();
153 input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
154 } catch (IOException ex) {
155 logger.error(" IOException will be handled " + ex);
157 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
158 Properties prop = new Properties();
160 if (propName.equals("consumer")) {
161 prop.setProperty("group.id", agentName);
162 prop.setProperty("zookeeper.connect", info);
164 prop.setProperty("metadata.broker.list", info);
166 out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
169 } catch (Exception e) {
170 logger.error("Exception at checkPropertiesFile " + e);
176 } catch (IOException e) {
177 logger.error("Exception occurred is " + e);
183 } catch (IOException e) {
185 logger.error("Exception is : " + e);
191 private void checkAgentProcess() {
192 logger.info("Checking MirrorMaker Process");
193 if (mirrorMakers != null) {
194 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
195 for (int i = 0; i < mirrorMakersCount; i++) {
196 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
197 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
198 checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
199 checkPropertiesFile(mm.name, "producer", mm.producer, false);
201 if (mm.whitelist != null && !mm.whitelist.equals("")) {
203 "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
204 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
205 mmagenthome + "/etc/" + mm.name + "consumer.properties",
206 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
207 mm.setStatus("RESTARTING");
210 logger.info("MirrorMaker " + mm.name + " is STOPPED");
211 mm.setStatus("STOPPED");
215 } catch (InterruptedException e) {
216 Thread.currentThread().interrupt();
218 mirrorMakers.getListMirrorMaker().set(i, mm);
220 logger.info("MirrorMaker " + mm.name + " is running");
221 mm.setStatus("RUNNING");
222 mirrorMakers.getListMirrorMaker().set(i, mm);
226 // Gson g = new Gson();
227 // System.out.println(g.toJson(mirrorMakers));
230 public void readAgentTopic() {
232 int connectionattempt = 0;
234 logger.info("--------------------------------");
235 logger.info("Waiting for Messages for 60 secs");
236 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
238 LinkedTreeMap<?, ?> object = null;
239 if (topicMessage != null) {
241 object = g.fromJson(topicMessage, LinkedTreeMap.class);
243 // Cast the 1st item (since limit=1 and see the type of
245 readAgent(object, topicMessage);
246 } catch (Exception ex) {
248 if (connectionattempt > 5) {
249 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex);
252 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
253 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
257 // Check all MirrorMaker every min
258 connectionattempt = 0;
265 } catch (Exception e) {
266 logger.error("Exception at readAgentTopic : " + e);
271 public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) {
275 if (object.get("createMirrorMaker") != null) {
276 logger.info("Received createMirrorMaker request from topic");
277 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
278 createMirrorMaker(m.getCreateMirrorMaker());
280 mirrorMakers.setMessageID(m.getMessageID());
281 topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
282 mirrorMakers.setMessageID("");
283 } else if (object.get("updateMirrorMaker") != null) {
284 logger.info("Received updateMirrorMaker request from topic");
285 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
286 updateMirrorMaker(m.getUpdateMirrorMaker());
288 mirrorMakers.setMessageID(m.getMessageID());
289 topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
290 mirrorMakers.setMessageID("");
291 } else if (object.get("deleteMirrorMaker") != null) {
292 logger.info("Received deleteMirrorMaker request from topic");
293 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
294 deleteMirrorMaker(m.getDeleteMirrorMaker());
296 mirrorMakers.setMessageID(m.getMessageID());
297 topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
298 mirrorMakers.setMessageID("");
299 } else if (object.get("listAllMirrorMaker") != null) {
300 logger.info("Received listALLMirrorMaker request from topic");
302 mirrorMakers.setMessageID((String) object.get("messageID"));
303 topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
304 } else if (object.get("updateWhiteList") != null) {
305 logger.info("Received updateWhiteList request from topic");
306 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
307 updateWhiteList(m.getUpdateWhiteList());
309 mirrorMakers.setMessageID(m.getMessageID());
310 topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
311 mirrorMakers.setMessageID("");
312 } else if (object.get("listMirrorMaker") != null) {
313 logger.info("Received listMirrorMaker from topic, skipping messages");
315 logger.info("Received unknown request from topic");
320 protected void createMirrorMaker(MirrorMaker newMirrorMaker) {
321 boolean exists = false;
322 if (mirrorMakers != null) {
323 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
324 for (int i = 0; i < mirrorMakersCount; i++) {
325 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
326 if (mm.name.equals(newMirrorMaker.name)) {
328 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
333 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
334 if (exists == false && mirrorMakers != null) {
335 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
336 } else if (exists == false && mirrorMakers == null) {
337 mirrorMakers = new ListMirrorMaker();
338 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
339 list = new ArrayList();
340 list.add(newMirrorMaker);
341 mirrorMakers.setListMirrorMaker(list);
343 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
344 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
347 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
348 OutputStream out = null;
350 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
351 mirrorMakerProperties.store(out, "");
352 } catch (IOException ex) {
353 logger.error(" IOException Occered " + ex);
358 } catch (IOException e) {
365 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
366 boolean exists = false;
367 if (mirrorMakers != null) {
368 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
369 for (int i = 0; i < mirrorMakersCount; i++) {
370 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
371 if (mm.name.equals(newMirrorMaker.name)) {
373 mm.setConsumer(newMirrorMaker.getConsumer());
374 mm.setProducer(newMirrorMaker.getProducer());
375 mirrorMakers.getListMirrorMaker().set(i, mm);
376 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
381 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
382 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
385 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
386 OutputStream out = null;
388 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
389 mirrorMakerProperties.store(out, "");
390 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
393 } catch (InterruptedException e) {
394 logger.log(Level.WARN, "Interrupted!", e);
395 Thread.currentThread().interrupt();
397 } catch (IOException ex) {
398 logger.error(" IOException Occered " + ex);
403 } catch (IOException e) {
404 logger.error(" IOException Occered " + e);
409 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
413 private void updateWhiteList(MirrorMaker newMirrorMaker) {
414 boolean exists = false;
415 if (mirrorMakers != null) {
416 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
417 for (int i = 0; i < mirrorMakersCount; i++) {
418 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
419 if (mm.name.equals(newMirrorMaker.name)) {
421 mm.setWhitelist(newMirrorMaker.whitelist);
422 mirrorMakers.getListMirrorMaker().set(i, mm);
423 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
424 + newMirrorMaker.whitelist);
430 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
431 OutputStream out = null;
433 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
434 mirrorMakerProperties.store(out, "");
435 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
438 } catch (InterruptedException e) {
439 logger.log(Level.WARN, "Interrupted!", e);
440 Thread.currentThread().interrupt();
442 } catch (IOException ex) {
443 logger.error("Exception at updateWhiteList : " + ex);
448 } catch (IOException e) {
450 logger.error("IOException occered " + e);
455 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
459 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
460 boolean exists = false;
461 if (mirrorMakers != null) {
462 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
463 for (int i = 0; i < mirrorMakersCount; i++) {
464 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
465 if (mm.name.equals(newMirrorMaker.name)) {
467 mirrorMakers.getListMirrorMaker().remove(i);
468 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
469 i = mirrorMakersCount;
475 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
476 File file = new File(path);
478 } catch (Exception ex) {
481 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
482 File file = new File(path);
484 } catch (Exception ex) {
487 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
488 OutputStream out = null;
490 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
491 mirrorMakerProperties.store(out, "");
492 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
493 } catch (IOException ex) {
494 ex.printStackTrace();
499 } catch (IOException e) {
505 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
509 private void loadProperties() {
510 InputStream input = null;
513 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
514 mirrorMakerProperties.load(input);
516 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
517 this.mirrorMakers = new ListMirrorMaker();
518 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
519 list = new ArrayList<MirrorMaker>();
520 this.mirrorMakers.setListMirrorMaker(list);
522 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
523 ListMirrorMaker.class);
526 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
527 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
528 this.topicname = mirrorMakerProperties.getProperty("topicname");
529 this.mechid = mirrorMakerProperties.getProperty("mechid");
531 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
532 textEncryptor.setPassword(secret);
534 // textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
535 this.password = mirrorMakerProperties.getProperty("password");
536 } catch (IOException ex) {
537 // ex.printStackTrace();
542 } catch (IOException e) {
543 // e.printStackTrace();