1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
23 package com.att.nsa.dmaapMMAgent;
25 import java.io.BufferedReader;
26 import java.io.DataOutputStream;
28 import java.io.FileInputStream;
29 import java.io.FileOutputStream;
30 import java.io.IOException;
31 import java.io.InputStream;
32 import java.io.InputStreamReader;
33 import java.io.OutputStream;
34 import java.net.HttpURLConnection;
36 import java.util.ArrayList;
37 import java.util.Properties;
39 import org.apache.log4j.Logger;
40 import org.jasypt.util.text.BasicTextEncryptor;
42 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
43 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
44 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
45 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
46 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
47 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
48 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
49 import com.google.gson.Gson;
50 import com.google.gson.internal.LinkedTreeMap;
51 import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
52 import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
54 public class MirrorMakerAgent {
55 static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
56 Properties mirrorMakerProperties = new Properties();
57 ListMirrorMaker mirrorMakers = null;
58 String mmagenthome = "";
59 String kafkahome = "";
61 String topicname = "";
64 private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
66 public static void main(String[] args) {
67 if (args != null && args.length == 2) {
68 if (args[0].equals("-encrypt")) {
69 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
70 textEncryptor.setPassword(secret);
71 String plainText = textEncryptor.encrypt(args[1]);
72 System.out.println("Encrypted Password is :" + plainText);
75 } else if (args != null && args.length > 0) {
77 "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
80 MirrorMakerAgent agent = new MirrorMakerAgent();
81 if (agent.checkStartup()) {
82 logger.info("mmagent started, loading properties");
83 agent.checkAgentProcess();
84 agent.readAgentTopic();
87 "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
91 private boolean checkStartup() {
92 FileInputStream input = null;
94 this.mmagenthome = System.getProperty("MMAGENTHOME");
95 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
96 logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
97 } catch (IOException ex) {
98 logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex);
104 } catch (IOException e) {
105 logger.error(" IOException occers " + e);
112 /*input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");*/
114 throw new IOException();
116 logger.info("kakahome is set :" + kafkahome);
117 } catch (IOException ex) {
118 logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex);
124 } catch (IOException e) {
125 logger.error("IOException" + e);
129 String response = publishTopic("{\"test\":\"test\"}");
130 if (response.startsWith("ERROR:")) {
131 logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
132 + this.topicURL + " Error is: " + response);
135 logger.info("Published to Topic :" + this.topicname + " Successfully");
136 response = subscribeTopic("1");
137 if (response != null && response.startsWith("ERROR:")) {
138 logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
139 + this.topicURL + " Error is: " + response);
142 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
146 private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
147 InputStream input = null;
148 OutputStream out = null;
151 throw new IOException();
153 input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
154 } catch (IOException ex) {
155 logger.error(" IOException will be handled " + ex);
157 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
158 Properties prop = new Properties();
160 if (propName.equals("consumer")) {
161 prop.setProperty("group.id", agentName);
162 prop.setProperty("zookeeper.connect", info);
164 prop.setProperty("metadata.broker.list", info);
166 out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
169 } catch (Exception e) {
170 logger.error("Exception at checkPropertiesFile " +e);
176 } catch (IOException e) {
177 logger.error("Exception occurred is " +e);
183 } catch (IOException e) {
185 logger.error("IOException" + e);
191 private void checkAgentProcess() {
192 logger.info("Checking MirrorMaker Process");
193 if (mirrorMakers != null) {
194 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
195 for (int i = 0; i < mirrorMakersCount; i++) {
196 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
197 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
198 checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
199 checkPropertiesFile(mm.name, "producer", mm.producer, false);
201 if (mm.whitelist != null && !mm.whitelist.equals("")) {
202 logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
203 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
204 mmagenthome + "/etc/" + mm.name + "consumer.properties",
205 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
206 mm.setStatus("RESTARTING");
209 logger.info("MirrorMaker " + mm.name + " is STOPPED");
210 mm.setStatus("STOPPED");
214 } catch (InterruptedException e) {
216 mirrorMakers.getListMirrorMaker().set(i, mm);
218 logger.info("MirrorMaker " + mm.name + " is running");
219 mm.setStatus("RUNNING");
220 mirrorMakers.getListMirrorMaker().set(i, mm);
224 // Gson g = new Gson();
225 // System.out.println(g.toJson(mirrorMakers));
228 private String subscribeTopic(String timeout) {
229 String response = "";
231 String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
233 String authString = this.mechid + ":" + this.password;
234 String authStringEnc = Base64.encode(authString.getBytes());
235 URL url = new URL(requestURL);
236 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
237 connection.setRequestMethod("GET");
238 connection.setDoOutput(true);
239 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
240 connection.setRequestProperty("Content-Type", "application/json");
241 InputStream content = (InputStream) connection.getInputStream();
242 BufferedReader in = new BufferedReader(new InputStreamReader(content));
245 while ((line = in.readLine()) != null) {
246 response = response + line;
249 // get message as JSON String Array
250 String[] topicMessage = g.fromJson(response, String[].class);
251 if (topicMessage.length != 0) {
252 return topicMessage[0];
254 } catch (Exception e) {
255 return "ERROR:" + e.getMessage() + " Server Response is:" + response;
260 private String publishTopic(String message) {
262 String requestURL = this.topicURL + "/events/" + this.topicname;
263 String authString = this.mechid + ":" + this.password;
264 String authStringEnc = Base64.encode(authString.getBytes());
265 URL url = new URL(requestURL);
266 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
267 connection.setRequestMethod("POST");
268 connection.setDoOutput(true);
269 connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
270 connection.setRequestProperty("Content-Type", "application/json");
271 connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
272 DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
273 wr.write(message.getBytes());
275 InputStream content = (InputStream) connection.getInputStream();
276 BufferedReader in = new BufferedReader(new InputStreamReader(content));
278 String response = "";
279 while ((line = in.readLine()) != null) {
280 response = response + line;
284 } catch (Exception e) {
285 return "ERROR:" + e.getLocalizedMessage();
289 private void readAgentTopic() {
291 int connectionattempt = 0;
293 logger.info("--------------------------------");
294 logger.info("Waiting for Messages for 60 secs");
295 String topicMessage = subscribeTopic("60000");
297 LinkedTreeMap<?, ?> object = null;
298 if (topicMessage != null) {
300 object = g.fromJson(topicMessage, LinkedTreeMap.class);
302 // Cast the 1st item (since limit=1 and see the type of
304 if (object.get("createMirrorMaker") != null) {
305 logger.info("Received createMirrorMaker request from topic");
306 CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
307 createMirrorMaker(m.getCreateMirrorMaker());
309 mirrorMakers.setMessageID(m.getMessageID());
310 publishTopic(g.toJson(mirrorMakers));
311 mirrorMakers.setMessageID("");
312 } else if (object.get("updateMirrorMaker") != null) {
313 logger.info("Received updateMirrorMaker request from topic");
314 UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
315 updateMirrorMaker(m.getUpdateMirrorMaker());
317 mirrorMakers.setMessageID(m.getMessageID());
318 publishTopic(g.toJson(mirrorMakers));
319 mirrorMakers.setMessageID("");
320 } else if (object.get("deleteMirrorMaker") != null) {
321 logger.info("Received deleteMirrorMaker request from topic");
322 DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
323 deleteMirrorMaker(m.getDeleteMirrorMaker());
325 mirrorMakers.setMessageID(m.getMessageID());
326 publishTopic(g.toJson(mirrorMakers));
327 mirrorMakers.setMessageID("");
328 } else if (object.get("listAllMirrorMaker") != null) {
329 logger.info("Received listALLMirrorMaker request from topic");
331 mirrorMakers.setMessageID((String) object.get("messageID"));
332 publishTopic(g.toJson(mirrorMakers));
333 mirrorMakers.setMessageID("");
334 } else if (object.get("updateWhiteList") != null) {
335 logger.info("Received updateWhiteList request from topic");
336 UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
337 updateWhiteList(m.getUpdateWhiteList());
339 mirrorMakers.setMessageID(m.getMessageID());
340 publishTopic(g.toJson(mirrorMakers));
341 mirrorMakers.setMessageID("");
342 } else if (object.get("listMirrorMaker") != null) {
343 logger.info("Received listMirrorMaker from topic, skipping messages");
345 logger.info("Received unknown request from topic");
347 } catch (Exception ex) {
349 if (connectionattempt > 5) {
350 logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
353 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
354 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
358 // Check all MirrorMaker every min
359 connectionattempt = 0;
364 } catch (Exception e) {
365 logger.error("Exception is : " +e);
370 protected void createMirrorMaker(MirrorMaker newMirrorMaker) {
371 boolean exists = false;
372 if (mirrorMakers != null) {
373 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
374 for (int i = 0; i < mirrorMakersCount; i++) {
375 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
376 if (mm.name.equals(newMirrorMaker.name)) {
378 logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
383 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
384 if (exists == false && mirrorMakers != null) {
385 mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
386 } else if (exists == false && mirrorMakers == null) {
387 mirrorMakers = new ListMirrorMaker();
388 ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
389 list = new ArrayList<MirrorMaker>();
390 list.add(newMirrorMaker);
391 mirrorMakers.setListMirrorMaker(list);
393 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
394 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
397 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
398 OutputStream out = null;
400 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
401 mirrorMakerProperties.store(out, "");
402 } catch (IOException ex) {
403 ex.printStackTrace();
408 } catch (IOException e) {
415 private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
416 boolean exists = false;
417 if (mirrorMakers != null) {
418 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
419 for (int i = 0; i < mirrorMakersCount; i++) {
420 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
421 if (mm.name.equals(newMirrorMaker.name)) {
423 mm.setConsumer(newMirrorMaker.getConsumer());
424 mm.setProducer(newMirrorMaker.getProducer());
425 mirrorMakers.getListMirrorMaker().set(i, mm);
426 logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
431 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
432 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
435 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
436 OutputStream out = null;
438 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
439 mirrorMakerProperties.store(out, "");
440 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
443 } catch (InterruptedException e) {
445 } catch (IOException ex) {
446 ex.printStackTrace();
451 } catch (IOException e) {
457 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
461 private void updateWhiteList(MirrorMaker newMirrorMaker) {
462 boolean exists = false;
463 if (mirrorMakers != null) {
464 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
465 for (int i = 0; i < mirrorMakersCount; i++) {
466 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
467 if (mm.name.equals(newMirrorMaker.name)) {
469 mm.setWhitelist(newMirrorMaker.whitelist);
470 mirrorMakers.getListMirrorMaker().set(i, mm);
471 logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
472 + newMirrorMaker.whitelist);
478 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
479 OutputStream out = null;
481 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
482 mirrorMakerProperties.store(out, "");
483 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
486 } catch (InterruptedException e) {
488 } catch (IOException ex) {
489 ex.printStackTrace();
494 } catch (IOException e) {
496 logger.error("IOException occered " + e);
501 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
505 private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
506 boolean exists = false;
507 if (mirrorMakers != null) {
508 int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
509 for (int i = 0; i < mirrorMakersCount; i++) {
510 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
511 if (mm.name.equals(newMirrorMaker.name)) {
513 mirrorMakers.getListMirrorMaker().remove(i);
514 logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
515 i = mirrorMakersCount;
521 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
522 File file = new File(path);
524 } catch (Exception ex) {
527 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
528 File file = new File(path);
530 } catch (Exception ex) {
533 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
534 OutputStream out = null;
536 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
537 mirrorMakerProperties.store(out, "");
538 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
539 } catch (IOException ex) {
540 ex.printStackTrace();
545 } catch (IOException e) {
551 logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
555 private void loadProperties() {
556 InputStream input = null;
559 input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
560 mirrorMakerProperties.load(input);
562 if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
563 this.mirrorMakers = new ListMirrorMaker();
564 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
565 list = new ArrayList<MirrorMaker>();
566 this.mirrorMakers.setListMirrorMaker(list);
568 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
569 ListMirrorMaker.class);
572 this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
573 this.topicURL = mirrorMakerProperties.getProperty("topicURL");
574 this.topicname = mirrorMakerProperties.getProperty("topicname");
575 this.mechid = mirrorMakerProperties.getProperty("mechid");
577 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
578 textEncryptor.setPassword(secret);
579 //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
580 this.password = mirrorMakerProperties.getProperty("password");
581 } catch (IOException ex) {
582 // ex.printStackTrace();
587 } catch (IOException e) {
588 // e.printStackTrace();