1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package com.att.nsa.dmaapMMAgent;
25 import java.io.BufferedReader;
26 import java.io.DataOutputStream;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.InputStreamReader;
33 import java.io.OutputStream;
34 import java.net.HttpURLConnection;
36 import java.util.ArrayList;
37 import java.util.Properties;
39 import org.apache.log4j.Level;
40 import org.apache.log4j.Logger;
41 import org.jasypt.util.text.BasicTextEncryptor;
43 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
44 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
45 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
46 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
47 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
48 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
49 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
50 import com.google.gson.Gson;
51 import com.google.gson.internal.LinkedTreeMap;
52 import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
53 import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
55 public class MirrorMakerAgent {
56 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
57 Properties mirrorMakerProperties = new Properties();
58 ListMirrorMaker mirrorMakers = null;
59 String mmagenthome = "";
60 String kafkahome = "";
62 String topicname = "";
65 public boolean exitLoop=false;
66 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
68 public static void main(String[] args) {
69 if (args != null && args.length == 2) {
70 if (args[0].equals("-encrypt")) {
71 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
72 textEncryptor.setPassword(secret);
73 String plainText = textEncryptor.encrypt(args[1]);
74 System.out.println("Encrypted Password is :" + plainText);
77 } else if (args != null && args.length > 0) {
79 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
82 MirrorMakerAgent agent = new MirrorMakerAgent();
83 if (agent.checkStartup()) {
84 logger.info("mmagent started, loading properties");
85 agent.checkAgentProcess();
86 agent.readAgentTopic();
89 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
93 private boolean checkStartup() {
94 FileInputStream input = null;
96 this.mmagenthome = System.getProperty("MMAGENTHOME");
97 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
98 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
99 } catch (IOException ex) {
100 logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex);
106 } catch (IOException e) {
107 logger.error(" IOException occers " + e);
114 /*input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");*/
116 throw new IOException();
118 logger.info("kakahome is set :" + kafkahome);
119 } catch (IOException ex) {
120 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex);
126 } catch (IOException e) {
127 logger.error("IOException" + e);
131 String response = publishTopic("{\"test\":\"test\"}");
132 if (response.startsWith("ERROR:")) {
133 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
134 + this.topicURL + " Error is: " + response);
137 logger.info("Published to Topic :" + this.topicname + " Successfully");
138 response = subscribeTopic("1");
139 if (response != null && response.startsWith("ERROR:")) {
140 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
141 + this.topicURL + " Error is: " + response);
144 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
148 private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
149 InputStream input = null;
150 OutputStream out = null;
153 throw new IOException();
155 input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
156 } catch (IOException ex) {
157 logger.error(" IOException will be handled " + ex);
159 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
160 Properties prop = new Properties();
162 if (propName.equals("consumer")) {
163 prop.setProperty("group.id", agentName);
164 prop.setProperty("zookeeper.connect", info);
166 prop.setProperty("metadata.broker.list", info);
168 out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
171 } catch (Exception e) {
172 logger.error("Exception at checkPropertiesFile " +e);
178 } catch (IOException e) {
179 logger.error("Exception occurred is " +e);
185 } catch (IOException e) {
187 logger.error("Exception is : "+e);
193 private void checkAgentProcess() {
194 logger.info("Checking MirrorMaker Process");
195 if (mirrorMakers != null) {
196 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
197 for (int i = 0; i < mirrorMakersCount; i++) {
198 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
199 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
200 checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
201 checkPropertiesFile(mm.name, "producer", mm.producer, false);
203 if (mm.whitelist != null && !mm.whitelist.equals("")) {
204 logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
205 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
206 mmagenthome + "/etc/" + mm.name + "consumer.properties",
207 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
208 mm.setStatus("RESTARTING");
211 logger.info("MirrorMaker " + mm.name + " is STOPPED");
212 mm.setStatus("STOPPED");
216 } catch (InterruptedException e) {
217 Thread.currentThread().interrupt();
219 mirrorMakers.getListMirrorMaker().set(i, mm);
221 logger.info("MirrorMaker " + mm.name + " is running");
222 mm.setStatus("RUNNING");
223 mirrorMakers.getListMirrorMaker().set(i, mm);
227 // Gson g = new Gson();
228 // System.out.println(g.toJson(mirrorMakers));
231 public String subscribeTopic(String timeout) {
232 String response = "";
234 String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
236 String authString = this.mechid + ":" + this.password;
237 String authStringEnc = Base64.encode(authString.getBytes());
238 URL url = new URL(requestURL);
239 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
240 connection.setRequestMethod("GET");
241 connection.setDoOutput(true);
242 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
243 connection.setRequestProperty("Content-Type", "application/json");
244 InputStream content = (InputStream) connection.getInputStream();
245 BufferedReader in = new BufferedReader(new InputStreamReader(content));
248 while ((line = in.readLine()) != null) {
249 response = response + line;
252 // get message as JSON String Array
253 String[] topicMessage = g.fromJson(response, String[].class);
254 if (topicMessage.length != 0) {
255 return topicMessage[0];
257 } catch (Exception e) {
258 logger.error(" Exception Occered " + e);
259 return "ERROR:" + e.getMessage() + " Server Response is:" + response;
264 public String publishTopic(String message) {
266 String requestURL = this.topicURL + "/events/" + this.topicname;
267 String authString = this.mechid + ":" + this.password;
268 String authStringEnc = Base64.encode(authString.getBytes());
269 URL url = new URL(requestURL);
270 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
271 connection.setRequestMethod("POST");
272 connection.setDoOutput(true);
273 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
274 connection.setRequestProperty("Content-Type", "application/json");
275 connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
276 DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
277 wr.write(message.getBytes());
279 InputStream content = (InputStream) connection.getInputStream();
280 BufferedReader in = new BufferedReader(new InputStreamReader(content));
282 String response = "";
283 while ((line = in.readLine()) != null) {
284 response = response + line;
288 } catch (Exception e) {
289 logger.error(" Exception Occered " + e);
290 return "ERROR:" + e.getLocalizedMessage();
294 public void readAgentTopic() {
296 int connectionattempt = 0;
298 logger.info("--------------------------------");
299 logger.info("Waiting for Messages for 60 secs");
300 String topicMessage = subscribeTopic("60000");
302 LinkedTreeMap<?, ?> object = null;
303 if (topicMessage != null) {
305 object = g.fromJson(topicMessage, LinkedTreeMap.class);
307 // Cast the 1st item (since limit=1 and see the type of
309 readAgent(object, topicMessage);
310 } catch (Exception ex) {
312 if (connectionattempt > 5) {
313 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex);
316 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
317 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
321 // Check all MirrorMaker every min
322 connectionattempt = 0;
329 } catch (Exception e) {
330 logger.error("Exception at readAgentTopic : " + e);
335 public void readAgent(LinkedTreeMap<?, ?> object,String topicMessage){
339 if (object.get("createMirrorMaker") != null) {
340 logger.info("Received createMirrorMaker request from topic");
341 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
342 createMirrorMaker(m.getCreateMirrorMaker());
344 mirrorMakers.setMessageID(m.getMessageID());
345 publishTopic(g.toJson(mirrorMakers));
346 mirrorMakers.setMessageID("");
347 } else if (object.get("updateMirrorMaker") != null) {
348 logger.info("Received updateMirrorMaker request from topic");
349 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
350 updateMirrorMaker(m.getUpdateMirrorMaker());
352 mirrorMakers.setMessageID(m.getMessageID());
353 publishTopic(g.toJson(mirrorMakers));
354 mirrorMakers.setMessageID("");
355 } else if (object.get("deleteMirrorMaker") != null) {
356 logger.info("Received deleteMirrorMaker request from topic");
357 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
358 deleteMirrorMaker(m.getDeleteMirrorMaker());
360 mirrorMakers.setMessageID(m.getMessageID());
361 publishTopic(g.toJson(mirrorMakers));
362 mirrorMakers.setMessageID("");
363 } else if (object.get("listAllMirrorMaker") != null) {
364 logger.info("Received listALLMirrorMaker request from topic");
366 mirrorMakers.setMessageID((String) object.get("messageID"));
367 publishTopic(g.toJson(mirrorMakers));
368 mirrorMakers.setMessageID("");
369 } else if (object.get("updateWhiteList") != null) {
370 logger.info("Received updateWhiteList request from topic");
371 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
372 updateWhiteList(m.getUpdateWhiteList());
374 mirrorMakers.setMessageID(m.getMessageID());
375 publishTopic(g.toJson(mirrorMakers));
376 mirrorMakers.setMessageID("");
377 } else if (object.get("listMirrorMaker") != null) {
378 logger.info("Received listMirrorMaker from topic, skipping messages");
380 logger.info("Received unknown request from topic");
385 protected void createMirrorMaker(MirrorMaker newMirrorMaker) {
386 boolean exists = false;
387 if (mirrorMakers != null) {
388 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
389 for (int i = 0; i < mirrorMakersCount; i++) {
390 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
391 if (mm.name.equals(newMirrorMaker.name)) {
393 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
398 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
399 if (exists == false && mirrorMakers != null) {
400 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
401 } else if (exists == false && mirrorMakers == null) {
402 mirrorMakers = new ListMirrorMaker();
403 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
404 list = new ArrayList();
405 list.add(newMirrorMaker);
406 mirrorMakers.setListMirrorMaker(list);
408 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
409 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
412 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
413 OutputStream out = null;
415 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
416 mirrorMakerProperties.store(out, "");
417 } catch (IOException ex) {
418 logger.error(" IOException Occered " + ex);
423 } catch (IOException e) {
430 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
431 boolean exists = false;
432 if (mirrorMakers != null) {
433 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
434 for (int i = 0; i < mirrorMakersCount; i++) {
435 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
436 if (mm.name.equals(newMirrorMaker.name)) {
438 mm.setConsumer(newMirrorMaker.getConsumer());
439 mm.setProducer(newMirrorMaker.getProducer());
440 mirrorMakers.getListMirrorMaker().set(i, mm);
441 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
446 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
447 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
450 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
451 OutputStream out = null;
453 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
454 mirrorMakerProperties.store(out, "");
455 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
458 } catch (InterruptedException e) {
459 logger.log(Level.WARN, "Interrupted!", e);
460 Thread.currentThread().interrupt();
462 } catch (IOException ex) {
463 logger.error(" IOException Occered " + ex);
468 } catch (IOException e) {
469 logger.error(" IOException Occered " + e);
474 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
478 private void updateWhiteList(MirrorMaker newMirrorMaker) {
479 boolean exists = false;
480 if (mirrorMakers != null) {
481 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
482 for (int i = 0; i < mirrorMakersCount; i++) {
483 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
484 if (mm.name.equals(newMirrorMaker.name)) {
486 mm.setWhitelist(newMirrorMaker.whitelist);
487 mirrorMakers.getListMirrorMaker().set(i, mm);
488 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
489 + newMirrorMaker.whitelist);
495 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
496 OutputStream out = null;
498 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
499 mirrorMakerProperties.store(out, "");
500 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
503 } catch (InterruptedException e) {
504 logger.log(Level.WARN, "Interrupted!", e);
505 Thread.currentThread().interrupt();
507 } catch (IOException ex) {
508 logger.error("Exception at updateWhiteList : " + ex);
513 } catch (IOException e) {
515 logger.error("IOException occered " + e);
520 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
524 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
525 boolean exists = false;
526 if (mirrorMakers != null) {
527 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
528 for (int i = 0; i < mirrorMakersCount; i++) {
529 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
530 if (mm.name.equals(newMirrorMaker.name)) {
532 mirrorMakers.getListMirrorMaker().remove(i);
533 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
534 i = mirrorMakersCount;
540 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
541 File file = new File(path);
543 } catch (Exception ex) {
546 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
547 File file = new File(path);
549 } catch (Exception ex) {
552 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
553 OutputStream out = null;
555 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
556 mirrorMakerProperties.store(out, "");
557 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
558 } catch (IOException ex) {
559 ex.printStackTrace();
564 } catch (IOException e) {
570 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
574 private void loadProperties() {
575 InputStream input = null;
578 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
579 mirrorMakerProperties.load(input);
581 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
582 this.mirrorMakers = new ListMirrorMaker();
583 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
584 list = new ArrayList<MirrorMaker>();
585 this.mirrorMakers.setListMirrorMaker(list);
587 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
588 ListMirrorMaker.class);
591 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
592 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
593 this.topicname = mirrorMakerProperties.getProperty("topicname");
594 this.mechid = mirrorMakerProperties.getProperty("mechid");
596 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
597 textEncryptor.setPassword(secret);
598 //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
599 this.password = mirrorMakerProperties.getProperty("password");
600 } catch (IOException ex) {
601 // ex.printStackTrace();
606 } catch (IOException e) {
607 // e.printStackTrace();