1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package com.att.nsa.dmaapMMAgent;
25 import java.io.BufferedReader;
26 import java.io.DataOutputStream;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.InputStreamReader;
33 import java.io.OutputStream;
34 import java.net.HttpURLConnection;
36 import java.util.ArrayList;
37 import java.util.Properties;
39 import org.apache.log4j.Level;
40 import org.apache.log4j.Logger;
41 import org.jasypt.util.text.BasicTextEncryptor;
43 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
44 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
45 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
46 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
47 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
48 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
49 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
50 import com.google.gson.Gson;
51 import com.google.gson.internal.LinkedTreeMap;
52 import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
53 import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
55 public class MirrorMakerAgent {/*
56 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
57 Properties mirrorMakerProperties = new Properties();
58 ListMirrorMaker mirrorMakers = null;
59 String mmagenthome = "";
60 String kafkahome = "";
62 String topicname = "";
65 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
67 public static void main(String[] args) {
68 if (args != null && args.length == 2) {
69 if (args[0].equals("-encrypt")) {
70 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
71 textEncryptor.setPassword(secret);
72 String plainText = textEncryptor.encrypt(args[1]);
73 System.out.println("Encrypted Password is :" + plainText);
76 } else if (args != null && args.length > 0) {
78 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
81 MirrorMakerAgent agent = new MirrorMakerAgent();
82 if (agent.checkStartup()) {
83 logger.info("mmagent started, loading properties");
84 agent.checkAgentProcess();
85 agent.readAgentTopic();
88 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
92 private boolean checkStartup() {
93 FileInputStream input = null;
95 this.mmagenthome = System.getProperty("MMAGENTHOME");
96 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
97 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
98 } catch (IOException ex) {
99 logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex);
105 } catch (IOException e) {
106 logger.error(" IOException occers " + e);
113 input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
115 throw new IOException();
117 logger.info("kakahome is set :" + kafkahome);
118 } catch (IOException ex) {
119 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex);
125 } catch (IOException e) {
126 logger.error("IOException" + e);
130 String response = publishTopic("{\"test\":\"test\"}");
131 if (response.startsWith("ERROR:")) {
132 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
133 + this.topicURL + " Error is: " + response);
136 logger.info("Published to Topic :" + this.topicname + " Successfully");
137 response = subscribeTopic("1");
138 if (response != null && response.startsWith("ERROR:")) {
139 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
140 + this.topicURL + " Error is: " + response);
143 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
147 private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
148 InputStream input = null;
149 OutputStream out = null;
152 throw new IOException();
154 input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
155 } catch (IOException ex) {
156 logger.error(" IOException will be handled " + ex);
158 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
159 Properties prop = new Properties();
161 if (propName.equals("consumer")) {
162 prop.setProperty("group.id", agentName);
163 prop.setProperty("zookeeper.connect", info);
165 prop.setProperty("metadata.broker.list", info);
167 out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
170 } catch (Exception e) {
171 logger.error("Exception at checkPropertiesFile " +e);
177 } catch (IOException e) {
178 logger.error("Exception occurred is " +e);
184 } catch (IOException e) {
186 logger.error("Exception is : "+e);
192 private void checkAgentProcess() {
193 logger.info("Checking MirrorMaker Process");
194 if (mirrorMakers != null) {
195 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
196 for (int i = 0; i < mirrorMakersCount; i++) {
197 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
198 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
199 checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
200 checkPropertiesFile(mm.name, "producer", mm.producer, false);
202 if (mm.whitelist != null && !mm.whitelist.equals("")) {
203 logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
204 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
205 mmagenthome + "/etc/" + mm.name + "consumer.properties",
206 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
207 mm.setStatus("RESTARTING");
210 logger.info("MirrorMaker " + mm.name + " is STOPPED");
211 mm.setStatus("STOPPED");
215 } catch (InterruptedException e) {
216 Thread.currentThread().interrupt();
218 mirrorMakers.getListMirrorMaker().set(i, mm);
220 logger.info("MirrorMaker " + mm.name + " is running");
221 mm.setStatus("RUNNING");
222 mirrorMakers.getListMirrorMaker().set(i, mm);
226 // Gson g = new Gson();
227 // System.out.println(g.toJson(mirrorMakers));
230 private String subscribeTopic(String timeout) {
231 String response = "";
233 String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
235 String authString = this.mechid + ":" + this.password;
236 String authStringEnc = Base64.encode(authString.getBytes());
237 URL url = new URL(requestURL);
238 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
239 connection.setRequestMethod("GET");
240 connection.setDoOutput(true);
241 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
242 connection.setRequestProperty("Content-Type", "application/json");
243 InputStream content = (InputStream) connection.getInputStream();
244 BufferedReader in = new BufferedReader(new InputStreamReader(content));
247 while ((line = in.readLine()) != null) {
248 response = response + line;
251 // get message as JSON String Array
252 String[] topicMessage = g.fromJson(response, String[].class);
253 if (topicMessage.length != 0) {
254 return topicMessage[0];
256 } catch (Exception e) {
257 logger.error(" Exception Occered " + e);
258 return "ERROR:" + e.getMessage() + " Server Response is:" + response;
263 private String publishTopic(String message) {
265 String requestURL = this.topicURL + "/events/" + this.topicname;
266 String authString = this.mechid + ":" + this.password;
267 String authStringEnc = Base64.encode(authString.getBytes());
268 URL url = new URL(requestURL);
269 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
270 connection.setRequestMethod("POST");
271 connection.setDoOutput(true);
272 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
273 connection.setRequestProperty("Content-Type", "application/json");
274 connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
275 DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
276 wr.write(message.getBytes());
278 InputStream content = (InputStream) connection.getInputStream();
279 BufferedReader in = new BufferedReader(new InputStreamReader(content));
281 String response = "";
282 while ((line = in.readLine()) != null) {
283 response = response + line;
287 } catch (Exception e) {
288 logger.error(" Exception Occered " + e);
289 return "ERROR:" + e.getLocalizedMessage();
293 private void readAgentTopic() {
295 int connectionattempt = 0;
297 logger.info("--------------------------------");
298 logger.info("Waiting for Messages for 60 secs");
299 String topicMessage = subscribeTopic("60000");
301 LinkedTreeMap<?, ?> object = null;
302 if (topicMessage != null) {
304 object = g.fromJson(topicMessage, LinkedTreeMap.class);
306 // Cast the 1st item (since limit=1 and see the type of
308 if (object.get("createMirrorMaker") != null) {
309 logger.info("Received createMirrorMaker request from topic");
310 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
311 createMirrorMaker(m.getCreateMirrorMaker());
313 mirrorMakers.setMessageID(m.getMessageID());
314 publishTopic(g.toJson(mirrorMakers));
315 mirrorMakers.setMessageID("");
316 } else if (object.get("updateMirrorMaker") != null) {
317 logger.info("Received updateMirrorMaker request from topic");
318 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
319 updateMirrorMaker(m.getUpdateMirrorMaker());
321 mirrorMakers.setMessageID(m.getMessageID());
322 publishTopic(g.toJson(mirrorMakers));
323 mirrorMakers.setMessageID("");
324 } else if (object.get("deleteMirrorMaker") != null) {
325 logger.info("Received deleteMirrorMaker request from topic");
326 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
327 deleteMirrorMaker(m.getDeleteMirrorMaker());
329 mirrorMakers.setMessageID(m.getMessageID());
330 publishTopic(g.toJson(mirrorMakers));
331 mirrorMakers.setMessageID("");
332 } else if (object.get("listAllMirrorMaker") != null) {
333 logger.info("Received listALLMirrorMaker request from topic");
335 mirrorMakers.setMessageID((String) object.get("messageID"));
336 publishTopic(g.toJson(mirrorMakers));
337 mirrorMakers.setMessageID("");
338 } else if (object.get("updateWhiteList") != null) {
339 logger.info("Received updateWhiteList request from topic");
340 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
341 updateWhiteList(m.getUpdateWhiteList());
343 mirrorMakers.setMessageID(m.getMessageID());
344 publishTopic(g.toJson(mirrorMakers));
345 mirrorMakers.setMessageID("");
346 } else if (object.get("listMirrorMaker") != null) {
347 logger.info("Received listMirrorMaker from topic, skipping messages");
349 logger.info("Received unknown request from topic");
351 } catch (Exception ex) {
353 if (connectionattempt > 5) {
354 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex);
357 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
358 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
362 // Check all MirrorMaker every min
363 connectionattempt = 0;
368 } catch (Exception e) {
369 logger.error("Exception at readAgentTopic : " + e);
374 protected void createMirrorMaker(MirrorMaker newMirrorMaker) {
375 boolean exists = false;
376 if (mirrorMakers != null) {
377 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
378 for (int i = 0; i < mirrorMakersCount; i++) {
379 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
380 if (mm.name.equals(newMirrorMaker.name)) {
382 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
387 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
388 if (exists == false && mirrorMakers != null) {
389 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
390 } else if (exists == false && mirrorMakers == null) {
391 mirrorMakers = new ListMirrorMaker();
392 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
393 list = new ArrayList();
394 list.add(newMirrorMaker);
395 mirrorMakers.setListMirrorMaker(list);
397 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
398 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
401 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
402 OutputStream out = null;
404 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
405 mirrorMakerProperties.store(out, "");
406 } catch (IOException ex) {
407 logger.error(" IOException Occered " + ex);
412 } catch (IOException e) {
419 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
420 boolean exists = false;
421 if (mirrorMakers != null) {
422 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
423 for (int i = 0; i < mirrorMakersCount; i++) {
424 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
425 if (mm.name.equals(newMirrorMaker.name)) {
427 mm.setConsumer(newMirrorMaker.getConsumer());
428 mm.setProducer(newMirrorMaker.getProducer());
429 mirrorMakers.getListMirrorMaker().set(i, mm);
430 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
435 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
436 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
439 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
440 OutputStream out = null;
442 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
443 mirrorMakerProperties.store(out, "");
444 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
447 } catch (InterruptedException e) {
448 logger.log(Level.WARN, "Interrupted!", e);
449 Thread.currentThread().interrupt();
451 } catch (IOException ex) {
452 logger.error(" IOException Occered " + ex);
457 } catch (IOException e) {
458 logger.error(" IOException Occered " + e);
463 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
467 private void updateWhiteList(MirrorMaker newMirrorMaker) {
468 boolean exists = false;
469 if (mirrorMakers != null) {
470 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
471 for (int i = 0; i < mirrorMakersCount; i++) {
472 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
473 if (mm.name.equals(newMirrorMaker.name)) {
475 mm.setWhitelist(newMirrorMaker.whitelist);
476 mirrorMakers.getListMirrorMaker().set(i, mm);
477 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
478 + newMirrorMaker.whitelist);
484 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
485 OutputStream out = null;
487 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
488 mirrorMakerProperties.store(out, "");
489 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
492 } catch (InterruptedException e) {
493 logger.log(Level.WARN, "Interrupted!", e);
494 Thread.currentThread().interrupt();
496 } catch (IOException ex) {
497 logger.error("Exception at updateWhiteList : " + ex);
502 } catch (IOException e) {
504 logger.error("IOException occered " + e);
509 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
513 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
514 boolean exists = false;
515 if (mirrorMakers != null) {
516 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
517 for (int i = 0; i < mirrorMakersCount; i++) {
518 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
519 if (mm.name.equals(newMirrorMaker.name)) {
521 mirrorMakers.getListMirrorMaker().remove(i);
522 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
523 i = mirrorMakersCount;
529 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
530 File file = new File(path);
532 } catch (Exception ex) {
535 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
536 File file = new File(path);
538 } catch (Exception ex) {
541 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
542 OutputStream out = null;
544 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
545 mirrorMakerProperties.store(out, "");
546 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
547 } catch (IOException ex) {
548 ex.printStackTrace();
553 } catch (IOException e) {
559 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
563 private void loadProperties() {
564 InputStream input = null;
567 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
568 mirrorMakerProperties.load(input);
570 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
571 this.mirrorMakers = new ListMirrorMaker();
572 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
573 list = new ArrayList<MirrorMaker>();
574 this.mirrorMakers.setListMirrorMaker(list);
576 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
577 ListMirrorMaker.class);
580 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
581 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
582 this.topicname = mirrorMakerProperties.getProperty("topicname");
583 this.mechid = mirrorMakerProperties.getProperty("mechid");
585 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
586 textEncryptor.setPassword(secret);
587 //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
588 this.password = mirrorMakerProperties.getProperty("password");
589 } catch (IOException ex) {
590 // ex.printStackTrace();
595 } catch (IOException e) {
596 // e.printStackTrace();