afab7f606165264346deb80aabda6a78b05e2ed8
[dmaap/messagerouter/mirroragent.git] / src / main / java / org / onap / dmaap / mr / dmaapMMAgent / MirrorMakerAgent.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.dmaapMMAgent;
23
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.util.ArrayList;
31 import java.util.Properties;
32
33 import org.apache.log4j.Logger;
34 import org.jasypt.util.text.BasicTextEncryptor;
35 import org.json.JSONObject;
36 import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
37 import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
38 import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
39 import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
40 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
41 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
42 import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
43
44 import com.google.gson.Gson;
45 import com.google.gson.internal.LinkedTreeMap;
46
47 public class MirrorMakerAgent {
48         static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
49         Properties mirrorMakerProperties = new Properties();
50         ListMirrorMaker mirrorMakers = null;
51         String mmagenthome = "/opt";
52         String kafkahome = "";
53         String topicURL = "";
54         String topicname = "";
55         String mechid = "";
56         String password = "";
57         String grepLog = "";
58         public boolean exitLoop = false;
59         TopicUtil topicUtil = new TopicUtil();
60         private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
61
62         public static void main(String[] args) {
63                 if (args != null && args.length == 2) {
64                         if (args[0].equals("-encrypt")) {
65                                 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
66                                 textEncryptor.setPassword(secret);
67                                 String plainText = textEncryptor.encrypt(args[1]);
68                                 System.out.println("Encrypted Password is :" + plainText);
69                                 return;
70                         }
71                 } else if (args != null && args.length > 0) {
72                         System.out.println(
73                                         "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
74                         return;
75                 }
76                 MirrorMakerAgent agent = new MirrorMakerAgent();
77                 if (agent.checkStartup()) {
78                         logger.info("mmagent started, loading properties");
79                         try {
80                                 agent.checkAgentProcess();
81                         } catch (Exception e) {
82
83                                 e.printStackTrace();
84                         }
85                         agent.readAgentTopic();
86                 } else {
87                         System.out.println(
88                                         "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
89                 }
90         }
91
92         private boolean checkStartup() {
93                 FileInputStream input = null;
94                 try {
95                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
96                         logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
97                 } catch (IOException ex) {
98                         logger.error(mmagenthome + "/etc/mmagent.config not found.");
99                         return false;
100                 } finally {
101                         if (input != null) {
102                                 try {
103                                         input.close();
104                                 } catch (IOException e) {
105                                         logger.error("exception occured in checkStartup "+e);
106                                 }
107                         }
108                 }
109                 loadProperties();
110                 input = null;
111                 try {
112                         input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
113                         logger.info("kafkahome is set :" + kafkahome);
114                 } catch (IOException ex) {
115                         logger.error(kafkahome + "/bin/kafka-run-class.sh not found.  Make sure kafka home is set correctly");
116                         return false;
117                 } finally {
118                         if (input != null) {
119                                 try {
120                                         input.close();
121                                 } catch (IOException e) {
122                                         logger.error("exception occured in checkStartup "+e);
123                                 }
124                         }
125                 }
126                 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
127                 if (response.startsWith("ERROR:")) {
128                         logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
129                                         + this.topicURL + " Error is:  " + response);
130                         return false;
131                 }
132                 logger.info("Published to Topic :" + this.topicname + " Successfully");
133                 response = topicUtil.subscribeTopic(topicURL, topicname, "1", mechid, password);
134                 if (response != null && response.startsWith("ERROR:")) {
135                         logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
136                                         + this.topicURL + " Error is:  " + response);
137                         return false;
138                 }
139                 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
140                 return true;
141         }
142
143         private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
144                 InputStream input = null;
145                 OutputStream out = null;
146                 try {
147                         if (refresh) {
148                                 throw new IOException();
149                         }
150                         input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
151                 } catch (IOException ex) {
152                         try {
153                                 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
154                                 Properties prop = new Properties();
155                                 prop.load(input);
156                                 if (propName.equals("consumer")) {
157                                         prop.setProperty("group.id", mm.name);
158
159                                         prop.setProperty("bootstrap.servers", mm.consumer);
160                                         prop.setProperty("client.id", mm.name + "MM_consumer");
161                                 } else {
162                                         prop.setProperty("bootstrap.servers", mm.producer);
163                                         prop.setProperty("client.id", mm.name + "MM_producer");
164
165                                 }
166                                 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
167                                 prop.store(out, "");
168
169                         } catch (Exception e) {
170                                 logger.error("exception occured in checkPropertiesFile "+e);
171                         }
172                 } finally {
173                         if (input != null) {
174                                 try {
175                                         input.close();
176                                 } catch (IOException e) {
177                                         e.printStackTrace();
178                                 }
179                         }
180                         if (out != null) {
181                                 try {
182                                         out.close();
183                                 } catch (IOException e) {
184                                         logger.error("exception occured in checkPropertiesFile "+e);
185                                 }
186                         }
187                 }
188         }
189
190         private void checkAgentProcess() throws Exception {
191                 logger.info("Checking MirrorMaker Process");
192                 if (mirrorMakers != null) {
193                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
194                         for (int i = 0; i < mirrorMakersCount; i++) {
195                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
196                                 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck,
197                                                 this.grepLog) == false) {
198                                         checkPropertiesFile(mm, "consumer", false);
199                                         checkPropertiesFile(mm, "producer", false);
200
201                                         if (mm.whitelist != null && !mm.whitelist.equals("")) {
202                                                 logger.info(
203                                                                 "MirrorMaker " + mm.name + " is not running, restarting.  Check Logs for more Details");
204                                                 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
205                                                                 mmagenthome + "/etc/" + mm.name + "consumer.properties",
206                                                                 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist);
207                                                 mm.setStatus("RESTARTING");
208
209                                         } else {
210                                                 logger.info("MirrorMaker " + mm.name + " is STOPPED");
211                                                 mm.setStatus("STOPPED");
212                                         }
213                                         try {
214                                                 Thread.sleep(1000);
215                                         } catch (InterruptedException e) {
216                                         }
217                                         mirrorMakers.getListMirrorMaker().set(i, mm);
218                                 } else {
219                                         logger.info("MirrorMaker " + mm.name + " is running");
220                                         mm.setStatus("RUNNING");
221                                         mirrorMakers.getListMirrorMaker().set(i, mm);
222                                 }
223                         }
224                 }
225                 // Gson g = new Gson();
226                 // System.out.println(g.toJson(mirrorMakers));
227         }
228
229         public void readAgentTopic() {
230                 try {
231                         int connectionattempt = 0;
232                         while (true) {
233                                 logger.info("--------------------------------");
234                                 logger.info("Waiting for Messages for 60 secs");
235                                 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
236                                 Gson g = new Gson();
237                                 LinkedTreeMap<?, ?> object = null;
238                                 if (topicMessage != null) {
239                                         try {
240                                                 // Check and parse if String object returned by consumer
241                                                 // API
242                                                 // else use the jsonObject
243                                                 if (topicMessage.startsWith("\"")) {
244                                                         topicMessage = g.fromJson(topicMessage.toString(), String.class);
245                                                 }
246                                                 object = g.fromJson(topicMessage, LinkedTreeMap.class);
247
248                                                 // Cast the 1st item (since limit=1 and see the type of
249                                                 // object
250                                                 readAgent(object, topicMessage);
251                                         } catch (Exception ex) {
252                                                 connectionattempt++;
253                                                 if (connectionattempt > 5) {
254                                                         logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
255                                                         return;
256                                                 }
257                                                 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
258                                                                 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
259                                                 Thread.sleep(60000);
260                                         }
261                                 } else {
262                                         // Check all MirrorMaker every min
263                                         connectionattempt = 0;
264                                         checkAgentProcess();
265                                 }
266                                 if (exitLoop) {
267                                         break;
268                                 }
269
270                         }
271                 } catch (Exception e) {
272                         e.printStackTrace();
273                 }
274
275         }
276
277         public void createMirrorMaker(MirrorMaker newMirrorMaker) {
278                 boolean exists = false;
279                 if (mirrorMakers != null) {
280                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
281                         for (int i = 0; i < mirrorMakersCount; i++) {
282                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
283                                 if (mm.name.equals(newMirrorMaker.name)) {
284                                         exists = true;
285                                         logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
286                                         return;
287                                 }
288                         }
289                 }
290                 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
291                 if (exists == false && mirrorMakers != null) {
292                         mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
293                 } else if (exists == false && mirrorMakers == null) {
294                         mirrorMakers = new ListMirrorMaker();
295                         ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
296                         list = new ArrayList<MirrorMaker>();
297                         list.add(newMirrorMaker);
298                         mirrorMakers.setListMirrorMaker(list);
299                 }
300                 checkPropertiesFile(newMirrorMaker, "consumer", true);
301                 checkPropertiesFile(newMirrorMaker, "producer", true);
302
303                 Gson g = new Gson();
304                 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
305                 OutputStream out = null;
306                 try {
307                         out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
308                         mirrorMakerProperties.store(out, "");
309                 } catch (IOException ex) {
310                         ex.printStackTrace();
311                 } finally {
312                         if (out != null) {
313                                 try {
314                                         out.close();
315                                 } catch (IOException e) {
316                                         e.printStackTrace();
317                                 }
318                         }
319                 }
320         }
321
322         private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
323                 boolean exists = false;
324                 if (mirrorMakers != null) {
325                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
326                         for (int i = 0; i < mirrorMakersCount; i++) {
327                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
328                                 if (mm.name.equals(newMirrorMaker.name)) {
329                                         exists = true;
330                                         if (null != newMirrorMaker.getConsumer()) {
331                                                 mm.setConsumer(newMirrorMaker.getConsumer());
332                                         }
333                                         if (null != newMirrorMaker.getProducer()) {
334                                                 mm.setProducer(newMirrorMaker.getProducer());
335                                         }
336                                         if (newMirrorMaker.getNumStreams() >= 1) {
337                                                 mm.setNumStreams(newMirrorMaker.getNumStreams());
338                                         }
339
340                                         mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
341
342                                         mirrorMakers.getListMirrorMaker().set(i, mm);
343                                         newMirrorMaker = mm;
344                                         logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
345                                 }
346                         }
347                 }
348                 if (exists) {
349                         checkPropertiesFile(newMirrorMaker, "consumer", true);
350                         checkPropertiesFile(newMirrorMaker, "producer", true);
351
352                         Gson g = new Gson();
353                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
354                         OutputStream out = null;
355                         try {
356                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
357                                 mirrorMakerProperties.store(out, "");
358                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
359                                 try {
360                                         Thread.sleep(1000);
361                                 } catch (InterruptedException e) {
362                                 }
363                         } catch (IOException ex) {
364                                 ex.printStackTrace();
365                         } finally {
366                                 if (out != null) {
367                                         try {
368                                                 out.close();
369                                         } catch (IOException e) {
370                                                 e.printStackTrace();
371                                         }
372                                 }
373                         }
374                 } else {
375                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
376                 }
377         }
378
379         private void updateWhiteList(MirrorMaker newMirrorMaker) {
380                 boolean exists = false;
381                 if (mirrorMakers != null) {
382                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
383                         for (int i = 0; i < mirrorMakersCount; i++) {
384                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
385                                 if (mm.name.equals(newMirrorMaker.name)) {
386                                         exists = true;
387                                         mm.setWhitelist(newMirrorMaker.whitelist);
388                                         mirrorMakers.getListMirrorMaker().set(i, mm);
389                                         logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
390                                                         + newMirrorMaker.whitelist);
391                                 }
392                         }
393                 }
394                 if (exists) {
395                         Gson g = new Gson();
396                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
397                         OutputStream out = null;
398                         try {
399                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
400                                 mirrorMakerProperties.store(out, "");
401                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
402                                 try {
403                                         Thread.sleep(1000);
404                                 } catch (InterruptedException e) {
405                                 }
406                         } catch (IOException ex) {
407                                 ex.printStackTrace();
408                         } finally {
409                                 if (out != null) {
410                                         try {
411                                                 out.close();
412                                         } catch (IOException e) {
413                                                 e.printStackTrace();
414                                         }
415                                 }
416                         }
417                 } else {
418                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
419                 }
420         }
421
422         private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
423                 boolean exists = false;
424                 if (mirrorMakers != null) {
425                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
426                         for (int i = 0; i < mirrorMakersCount; i++) {
427                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
428                                 if (mm.name.equals(newMirrorMaker.name)) {
429                                         exists = true;
430                                         mirrorMakers.getListMirrorMaker().remove(i);
431                                         logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
432                                         i = mirrorMakersCount;
433                                 }
434                         }
435                 }
436                 if (exists) {
437                         try {
438                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
439                                 File file = new File(path);
440                                 file.delete();
441                         } catch (Exception ex) {
442                         }
443                         try {
444                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
445                                 File file = new File(path);
446                                 file.delete();
447                         } catch (Exception ex) {
448                         }
449                         Gson g = new Gson();
450                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
451                         OutputStream out = null;
452                         try {
453                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
454                                 mirrorMakerProperties.store(out, "");
455                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
456                         } catch (IOException ex) {
457                                 ex.printStackTrace();
458                         } finally {
459                                 if (out != null) {
460                                         try {
461                                                 out.close();
462                                         } catch (IOException e) {
463                                                 e.printStackTrace();
464                                         }
465                                 }
466                         }
467                 } else {
468                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
469                 }
470         }
471
472         private void loadProperties() {
473                 InputStream input = null;
474                 try {
475
476                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
477                         mirrorMakerProperties.load(input);
478                         Gson g = new Gson();
479                         if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
480                                 this.mirrorMakers = new ListMirrorMaker();
481                                 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
482                                 list = new ArrayList<MirrorMaker>();
483                                 this.mirrorMakers.setListMirrorMaker(list);
484                         } else {
485                                 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
486                                                 ListMirrorMaker.class);
487                         }
488
489                         this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
490                         this.topicURL = mirrorMakerProperties.getProperty("topicURL");
491                         this.topicname = mirrorMakerProperties.getProperty("topicname");
492                         this.mechid = mirrorMakerProperties.getProperty("mechid");
493                         this.grepLog = mirrorMakerProperties.getProperty("grepLog");
494
495                         BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
496                         textEncryptor.setPassword(secret);
497                         this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
498                 } catch (Exception ex) {
499                         // ex.printStackTrace();
500                 } finally {
501                         if (input != null) {
502                                 try {
503                                         input.close();
504                                 } catch (IOException e) {
505                                         // e.printStackTrace();
506                                 }
507                         }
508                 }
509
510         }
511
512         public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) throws Exception{
513
514                 Gson g = new Gson();
515
516                 if (object.get("createMirrorMaker") != null) {
517                         logger.info("Received createMirrorMaker request from topic");
518                         CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
519                         createMirrorMaker(m.getCreateMirrorMaker());
520                         checkAgentProcess();
521                         mirrorMakers.setMessageID(m.getMessageID());
522                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
523                         mirrorMakers.setMessageID("");
524                 } else if (object.get("updateMirrorMaker") != null) {
525                         logger.info("Received updateMirrorMaker request from topic");
526                         UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
527                         JSONObject json = new JSONObject(topicMessage);
528                         JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
529                         if (!json2.has("numStreams")) {
530                                 m.getUpdateMirrorMaker().setNumStreams(0);
531                         }
532                         updateMirrorMaker(m.getUpdateMirrorMaker());
533                         checkAgentProcess();
534                         mirrorMakers.setMessageID(m.getMessageID());
535                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
536                         mirrorMakers.setMessageID("");
537                 } else if (object.get("deleteMirrorMaker") != null) {
538                         logger.info("Received deleteMirrorMaker request from topic");
539                         DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
540                         deleteMirrorMaker(m.getDeleteMirrorMaker());
541                         checkAgentProcess();
542                         mirrorMakers.setMessageID(m.getMessageID());
543                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
544                         mirrorMakers.setMessageID("");
545                 } else if (object.get("listAllMirrorMaker") != null) {
546                         logger.info("Received listALLMirrorMaker request from topic");
547                         checkAgentProcess();
548                         mirrorMakers.setMessageID((String) object.get("messageID"));
549                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
550                         mirrorMakers.setMessageID("");
551                 } else if (object.get("updateWhiteList") != null) {
552                         logger.info("Received updateWhiteList request from topic");
553                         UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
554                         updateWhiteList(m.getUpdateWhiteList());
555                         checkAgentProcess();
556                         mirrorMakers.setMessageID(m.getMessageID());
557                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
558                         mirrorMakers.setMessageID("");
559                 } else if (object.get("listMirrorMaker") != null) {
560                         logger.info("Received listMirrorMaker from topic, skipping messages");
561                 } else {
562                         logger.info("Received unknown request from topic");
563                 }
564
565         }
566 }