1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaapMMAgent;
24 import java.io.BufferedReader;
25 import java.io.DataOutputStream;
27 import java.io.FileInputStream;
28 import java.io.FileOutputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.InputStreamReader;
32 import java.io.OutputStream;
33 import java.net.HttpURLConnection;
35 import java.util.ArrayList;
36 import java.util.Properties;
37 //import org.json.JSONObject;
38 //import org.apache.log4j.Logger;
39 //import org.jasypt.util.text.BasicTextEncryptor;
41 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
42 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
43 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
44 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
45 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
46 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
47 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
48 import com.google.gson.Gson;
49 import com.google.gson.JsonArray;
50 import com.google.gson.internal.LinkedTreeMap;
51 import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
52 import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
54 public class MirrorMakerAgent {/*
55 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
56 Properties mirrorMakerProperties = new Properties();
57 ListMirrorMaker mirrorMakers = null;
58 String mmagenthome = "";
59 String kafkahome = "";
61 String topicname = "";
65 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
67 public static void main(String[] args) {
68 if (args != null && args.length == 2) {
69 if (args[0].equals("-encrypt")) {
70 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
71 textEncryptor.setPassword(secret);
72 String plainText = textEncryptor.encrypt(args[1]);
73 System.out.println("Encrypted Password is :" + plainText);
76 } else if (args != null && args.length > 0) {
78 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
81 MirrorMakerAgent agent = new MirrorMakerAgent();
82 if (agent.checkStartup()) {
83 logger.info("mmagent started, loading properties");
85 agent.checkAgentProcess();
86 } catch (Exception e) {
90 agent.readAgentTopic();
93 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
97 private boolean checkStartup() {
98 FileInputStream input = null;
100 this.mmagenthome = System.getProperty("MMAGENTHOME");
101 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
102 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
103 } catch (IOException ex) {
104 logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file");
110 } catch (IOException e) {
118 input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
119 logger.info("kakahome is set :" + kafkahome);
120 } catch (IOException ex) {
121 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly");
127 } catch (IOException e) {
132 String response = publishTopic("{\"test\":\"test\"}");
133 if (response.startsWith("ERROR:")) {
134 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
135 + this.topicURL + " Error is: " + response);
138 logger.info("Published to Topic :" + this.topicname + " Successfully");
139 response = subscribeTopic("1");
140 if (response != null && response.startsWith("ERROR:")) {
141 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
142 + this.topicURL + " Error is: " + response);
145 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
149 private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
150 InputStream input = null;
151 OutputStream out = null;
154 throw new IOException();
156 input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
157 } catch (IOException ex) {
159 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
160 Properties prop = new Properties();
162 if (propName.equals("consumer")) {
163 prop.setProperty("group.id", mm.name);
165 prop.setProperty("bootstrap.servers", mm.consumer);
166 prop.setProperty("client.id", mm.name + "MM_consumer");
168 prop.setProperty("bootstrap.servers", mm.producer);
169 prop.setProperty("client.id", mm.name + "MM_producer");
172 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
175 } catch (Exception e) {
182 } catch (IOException e) {
189 } catch (IOException e) {
196 private void checkAgentProcess() throws Exception {
197 logger.info("Checking MirrorMaker Process");
198 if (mirrorMakers != null) {
199 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
200 for (int i = 0; i < mirrorMakersCount; i++) {
201 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
202 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name,mm.enablelogCheck,this.grepLog) == false) {
203 checkPropertiesFile(mm, "consumer", false);
204 checkPropertiesFile(mm, "producer", false);
206 if (mm.whitelist != null && !mm.whitelist.equals("")) {
207 logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
208 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
209 mmagenthome + "/etc/" + mm.name + "consumer.properties",
210 mmagenthome + "/etc/" + mm.name + "producer.properties",mm.numStreams, mm.whitelist);
211 mm.setStatus("RESTARTING");
214 logger.info("MirrorMaker " + mm.name + " is STOPPED");
215 mm.setStatus("STOPPED");
219 } catch (InterruptedException e) {
221 mirrorMakers.getListMirrorMaker().set(i, mm);
223 logger.info("MirrorMaker " + mm.name + " is running");
224 mm.setStatus("RUNNING");
225 mirrorMakers.getListMirrorMaker().set(i, mm);
229 // Gson g = new Gson();
230 // System.out.println(g.toJson(mirrorMakers));
233 private String subscribeTopic(String timeout) {
234 String response = "";
236 String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
238 String authString = this.mechid + ":" + this.password;
239 String authStringEnc = Base64.encode(authString.getBytes());
240 URL url = new URL(requestURL);
241 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
242 connection.setRequestMethod("GET");
243 connection.setDoOutput(true);
244 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
245 connection.setRequestProperty("Content-Type", "application/json");
246 InputStream content = (InputStream) connection.getInputStream();
247 BufferedReader in = new BufferedReader(new InputStreamReader(content));
250 while ((line = in.readLine()) != null) {
251 response = response + line;
254 //Get message as JSON Array
255 JsonArray topicMessage = g.fromJson(response, JsonArray.class);
256 if (topicMessage.size() != 0) {
257 return topicMessage.get(0).toString();
260 // get message as JSON String Array
261 String[] topicMessage = g.fromJson(response, String[].class);
262 if (topicMessage.length != 0) {
263 return topicMessage[0];
265 } catch (Exception e) {
266 return "ERROR:" + e.getMessage() + " Server Response is:" + response;
271 private String publishTopic(String message) {
273 String requestURL = this.topicURL + "/events/" + this.topicname;
274 String authString = this.mechid + ":" + this.password;
275 String authStringEnc = Base64.encode(authString.getBytes());
276 URL url = new URL(requestURL);
277 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
278 connection.setRequestMethod("POST");
279 connection.setDoOutput(true);
280 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
281 connection.setRequestProperty("Content-Type", "application/json");
282 connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
283 DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
284 wr.write(message.getBytes());
286 InputStream content = (InputStream) connection.getInputStream();
287 BufferedReader in = new BufferedReader(new InputStreamReader(content));
289 String response = "";
290 while ((line = in.readLine()) != null) {
291 response = response + line;
295 } catch (Exception e) {
296 return "ERROR:" + e.getLocalizedMessage();
300 private void readAgentTopic() {
302 int connectionattempt = 0;
304 logger.info("--------------------------------");
305 logger.info("Waiting for Messages for 60 secs");
306 String topicMessage = subscribeTopic("60000");
308 LinkedTreeMap<?, ?> object = null;
309 if (topicMessage != null) {
311 //Check and parse if String object returned by consumer API
312 //else use the jsonObject
313 if( topicMessage.startsWith("\""))
315 topicMessage = g.fromJson(topicMessage.toString(), String.class);
317 object = g.fromJson(topicMessage, LinkedTreeMap.class);
319 // Cast the 1st item (since limit=1 and see the type of
321 if (object.get("createMirrorMaker") != null) {
322 logger.info("Received createMirrorMaker request from topic");
323 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
324 createMirrorMaker(m.getCreateMirrorMaker());
326 mirrorMakers.setMessageID(m.getMessageID());
327 publishTopic(g.toJson(mirrorMakers));
328 mirrorMakers.setMessageID("");
329 } else if (object.get("updateMirrorMaker") != null) {
330 logger.info("Received updateMirrorMaker request from topic");
331 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
332 JSONObject json = new JSONObject(topicMessage);
333 JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
334 if(!json2.has("numStreams")){
335 m.getUpdateMirrorMaker().setNumStreams(0);
337 updateMirrorMaker(m.getUpdateMirrorMaker());
339 mirrorMakers.setMessageID(m.getMessageID());
340 publishTopic(g.toJson(mirrorMakers));
341 mirrorMakers.setMessageID("");
342 } else if (object.get("deleteMirrorMaker") != null) {
343 logger.info("Received deleteMirrorMaker request from topic");
344 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
345 deleteMirrorMaker(m.getDeleteMirrorMaker());
347 mirrorMakers.setMessageID(m.getMessageID());
348 publishTopic(g.toJson(mirrorMakers));
349 mirrorMakers.setMessageID("");
350 } else if (object.get("listAllMirrorMaker") != null) {
351 logger.info("Received listALLMirrorMaker request from topic");
353 mirrorMakers.setMessageID((String) object.get("messageID"));
354 publishTopic(g.toJson(mirrorMakers));
355 mirrorMakers.setMessageID("");
356 } else if (object.get("updateWhiteList") != null) {
357 logger.info("Received updateWhiteList request from topic");
358 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
359 updateWhiteList(m.getUpdateWhiteList());
361 mirrorMakers.setMessageID(m.getMessageID());
362 publishTopic(g.toJson(mirrorMakers));
363 mirrorMakers.setMessageID("");
364 } else if (object.get("listMirrorMaker") != null) {
365 logger.info("Received listMirrorMaker from topic, skipping messages");
367 logger.info("Received unknown request from topic");
369 } catch (Exception ex) {
371 if (connectionattempt > 5) {
372 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
375 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
376 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
380 // Check all MirrorMaker every min
381 connectionattempt = 0;
386 } catch (Exception e) {
392 private void createMirrorMaker(MirrorMaker newMirrorMaker) {
393 boolean exists = false;
394 if (mirrorMakers != null) {
395 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
396 for (int i = 0; i < mirrorMakersCount; i++) {
397 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
398 if (mm.name.equals(newMirrorMaker.name)) {
400 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
405 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
406 if (exists == false && mirrorMakers != null) {
407 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
408 } else if (exists == false && mirrorMakers == null) {
409 mirrorMakers = new ListMirrorMaker();
410 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
411 list = new ArrayList<MirrorMaker>();
412 list.add(newMirrorMaker);
413 mirrorMakers.setListMirrorMaker(list);
415 checkPropertiesFile(newMirrorMaker, "consumer", true);
416 checkPropertiesFile(newMirrorMaker, "producer", true);
419 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
420 OutputStream out = null;
422 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
423 mirrorMakerProperties.store(out, "");
424 } catch (IOException ex) {
425 ex.printStackTrace();
430 } catch (IOException e) {
437 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
438 boolean exists = false;
439 if (mirrorMakers != null) {
440 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
441 for (int i = 0; i < mirrorMakersCount; i++) {
442 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
443 if (mm.name.equals(newMirrorMaker.name)) {
445 if(null!=newMirrorMaker.getConsumer())
447 mm.setConsumer(newMirrorMaker.getConsumer());
449 if(null!=newMirrorMaker.getProducer())
451 mm.setProducer(newMirrorMaker.getProducer());
453 if(newMirrorMaker.getNumStreams()>=1)
455 mm.setNumStreams(newMirrorMaker.getNumStreams());
458 mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
460 mirrorMakers.getListMirrorMaker().set(i, mm);
462 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
467 checkPropertiesFile(newMirrorMaker, "consumer", true);
468 checkPropertiesFile(newMirrorMaker, "producer", true);
471 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
472 OutputStream out = null;
474 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
475 mirrorMakerProperties.store(out, "");
476 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
479 } catch (InterruptedException e) {
481 } catch (IOException ex) {
482 ex.printStackTrace();
487 } catch (IOException e) {
493 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
497 private void updateWhiteList(MirrorMaker newMirrorMaker) {
498 boolean exists = false;
499 if (mirrorMakers != null) {
500 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
501 for (int i = 0; i < mirrorMakersCount; i++) {
502 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
503 if (mm.name.equals(newMirrorMaker.name)) {
505 mm.setWhitelist(newMirrorMaker.whitelist);
506 mirrorMakers.getListMirrorMaker().set(i, mm);
507 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
508 + newMirrorMaker.whitelist);
514 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
515 OutputStream out = null;
517 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
518 mirrorMakerProperties.store(out, "");
519 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
522 } catch (InterruptedException e) {
524 } catch (IOException ex) {
525 ex.printStackTrace();
530 } catch (IOException e) {
536 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
540 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
541 boolean exists = false;
542 if (mirrorMakers != null) {
543 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
544 for (int i = 0; i < mirrorMakersCount; i++) {
545 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
546 if (mm.name.equals(newMirrorMaker.name)) {
548 mirrorMakers.getListMirrorMaker().remove(i);
549 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
550 i = mirrorMakersCount;
556 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
557 File file = new File(path);
559 } catch (Exception ex) {
562 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
563 File file = new File(path);
565 } catch (Exception ex) {
568 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
569 OutputStream out = null;
571 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
572 mirrorMakerProperties.store(out, "");
573 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
574 } catch (IOException ex) {
575 ex.printStackTrace();
580 } catch (IOException e) {
586 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
590 private void loadProperties() {
591 InputStream input = null;
594 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
595 mirrorMakerProperties.load(input);
597 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
598 this.mirrorMakers = new ListMirrorMaker();
599 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
600 list = new ArrayList<MirrorMaker>();
601 this.mirrorMakers.setListMirrorMaker(list);
603 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
604 ListMirrorMaker.class);
607 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
608 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
609 this.topicname = mirrorMakerProperties.getProperty("topicname");
610 this.mechid = mirrorMakerProperties.getProperty("mechid");
611 this.grepLog= mirrorMakerProperties.getProperty("grepLog");
613 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
614 textEncryptor.setPassword(secret);
615 this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
616 } catch (IOException ex) {
617 // ex.printStackTrace();
622 } catch (IOException e) {
623 // e.printStackTrace();