c205440e25f2308dffdfbf7e92b497b4dcbda9d8
[dmaap/messagerouter/mirroragent.git] / src / main / java / org / onap / dmaap / mr / dmaapMMAgent / MirrorMakerAgent.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.dmaapMMAgent;
23
24 import java.io.File;
25 import java.io.FileInputStream;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.InputStream;
29 import java.io.OutputStream;
30 import java.util.ArrayList;
31 import java.util.Properties;
32
33 import org.apache.log4j.Logger;
34 import org.jasypt.util.text.BasicTextEncryptor;
35 import org.json.JSONObject;
36 import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
37 import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
38 import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
39 import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
40 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
41 import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
42 import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
43
44 import com.google.gson.Gson;
45 import com.google.gson.internal.LinkedTreeMap;
46
47 public class MirrorMakerAgent {
48         static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
49         Properties mirrorMakerProperties = new Properties();
50         ListMirrorMaker mirrorMakers = null;
51         String mmagenthome = "/opt";
52         String kafkahome = "";
53         String topicURL = "";
54         String topicname = "";
55         String mechid = "";
56         String password = "";
57         String grepLog = "";
58         public boolean exitLoop = false;
59         TopicUtil topicUtil = new TopicUtil();
60         private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
61
62         public static void main(String[] args) {
63                 if (args != null && args.length == 2) {
64                         if (args[0].equals("-encrypt")) {
65                                 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
66                                 textEncryptor.setPassword(secret);
67                                 String plainText = textEncryptor.encrypt(args[1]);
68                                 System.out.println("Encrypted Password is :" + plainText);
69                                 return;
70                         }
71                 } else if (args != null && args.length > 0) {
72                         System.out.println(
73                                         "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
74                         return;
75                 }
76                 MirrorMakerAgent agent = new MirrorMakerAgent();
77                 if (agent.checkStartup()) {
78                         logger.info("mmagent started, loading properties");
79                         try {
80                                 agent.checkAgentProcess();
81                         } catch (Exception e) {
82
83                                 e.printStackTrace();
84                         }
85                         agent.readAgentTopic();
86                 } else {
87                         System.out.println(
88                                         "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
89                 }
90         }
91
92         private boolean checkStartup() {
93                 FileInputStream input = null;
94                 try {
95                         //this.mmagenthome = System.getProperty("MMAGENTHOME");
96                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
97                         logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
98                 } catch (IOException ex) {
99                         logger.error(mmagenthome + "/etc/mmagent.config not found.  Set -DMMAGENTHOME and check the config file");
100                         return false;
101                 } finally {
102                         if (input != null) {
103                                 try {
104                                         input.close();
105                                 } catch (IOException e) {
106                                         logger.error("exception occured in checkStartup "+e);
107                                 }
108                         }
109                 }
110                 loadProperties();
111                 input = null;
112                 try {
113                         input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
114                         logger.info("kakahome is set :" + kafkahome);
115                 } catch (IOException ex) {
116                         logger.error(kafkahome + "/bin/kafka-run-class.sh not found.  Make sure kafka home is set correctly");
117                         return false;
118                 } finally {
119                         if (input != null) {
120                                 try {
121                                         input.close();
122                                 } catch (IOException e) {
123                                         logger.error("exception occured in checkStartup "+e);
124                                 }
125                         }
126                 }
127                 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
128                 if (response.startsWith("ERROR:")) {
129                         logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
130                                         + this.topicURL + " Error is:  " + response);
131                         return false;
132                 }
133                 logger.info("Published to Topic :" + this.topicname + " Successfully");
134                 response = topicUtil.subscribeTopic(topicURL, topicname, "1", mechid, password);
135                 if (response != null && response.startsWith("ERROR:")) {
136                         logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
137                                         + this.topicURL + " Error is:  " + response);
138                         return false;
139                 }
140                 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
141                 return true;
142         }
143
144         private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
145                 InputStream input = null;
146                 OutputStream out = null;
147                 try {
148                         if (refresh) {
149                                 throw new IOException();
150                         }
151                         input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
152                 } catch (IOException ex) {
153                         try {
154                                 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
155                                 Properties prop = new Properties();
156                                 prop.load(input);
157                                 if (propName.equals("consumer")) {
158                                         prop.setProperty("group.id", mm.name);
159
160                                         prop.setProperty("bootstrap.servers", mm.consumer);
161                                         prop.setProperty("client.id", mm.name + "MM_consumer");
162                                 } else {
163                                         prop.setProperty("bootstrap.servers", mm.producer);
164                                         prop.setProperty("client.id", mm.name + "MM_producer");
165
166                                 }
167                                 out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
168                                 prop.store(out, "");
169
170                         } catch (Exception e) {
171                                 logger.error("exception occured in checkPropertiesFile "+e);
172                         }
173                 } finally {
174                         if (input != null) {
175                                 try {
176                                         input.close();
177                                 } catch (IOException e) {
178                                         e.printStackTrace();
179                                 }
180                         }
181                         if (out != null) {
182                                 try {
183                                         out.close();
184                                 } catch (IOException e) {
185                                         logger.error("exception occured in checkPropertiesFile "+e);
186                                 }
187                         }
188                 }
189         }
190
191         private void checkAgentProcess() throws Exception {
192                 logger.info("Checking MirrorMaker Process");
193                 if (mirrorMakers != null) {
194                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
195                         for (int i = 0; i < mirrorMakersCount; i++) {
196                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
197                                 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck,
198                                                 this.grepLog) == false) {
199                                         checkPropertiesFile(mm, "consumer", false);
200                                         checkPropertiesFile(mm, "producer", false);
201
202                                         if (mm.whitelist != null && !mm.whitelist.equals("")) {
203                                                 logger.info(
204                                                                 "MirrorMaker " + mm.name + " is not running, restarting.  Check Logs for more Details");
205                                                 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
206                                                                 mmagenthome + "/etc/" + mm.name + "consumer.properties",
207                                                                 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist);
208                                                 mm.setStatus("RESTARTING");
209
210                                         } else {
211                                                 logger.info("MirrorMaker " + mm.name + " is STOPPED");
212                                                 mm.setStatus("STOPPED");
213                                         }
214                                         try {
215                                                 Thread.sleep(1000);
216                                         } catch (InterruptedException e) {
217                                         }
218                                         mirrorMakers.getListMirrorMaker().set(i, mm);
219                                 } else {
220                                         logger.info("MirrorMaker " + mm.name + " is running");
221                                         mm.setStatus("RUNNING");
222                                         mirrorMakers.getListMirrorMaker().set(i, mm);
223                                 }
224                         }
225                 }
226                 // Gson g = new Gson();
227                 // System.out.println(g.toJson(mirrorMakers));
228         }
229
230         public void readAgentTopic() {
231                 try {
232                         int connectionattempt = 0;
233                         while (true) {
234                                 logger.info("--------------------------------");
235                                 logger.info("Waiting for Messages for 60 secs");
236                                 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
237                                 Gson g = new Gson();
238                                 LinkedTreeMap<?, ?> object = null;
239                                 if (topicMessage != null) {
240                                         try {
241                                                 // Check and parse if String object returned by consumer
242                                                 // API
243                                                 // else use the jsonObject
244                                                 if (topicMessage.startsWith("\"")) {
245                                                         topicMessage = g.fromJson(topicMessage.toString(), String.class);
246                                                 }
247                                                 object = g.fromJson(topicMessage, LinkedTreeMap.class);
248
249                                                 // Cast the 1st item (since limit=1 and see the type of
250                                                 // object
251                                                 readAgent(object, topicMessage);
252                                         } catch (Exception ex) {
253                                                 connectionattempt++;
254                                                 if (connectionattempt > 5) {
255                                                         logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
256                                                         return;
257                                                 }
258                                                 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
259                                                                 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
260                                                 Thread.sleep(60000);
261                                         }
262                                 } else {
263                                         // Check all MirrorMaker every min
264                                         connectionattempt = 0;
265                                         checkAgentProcess();
266                                 }
267                                 if (exitLoop) {
268                                         break;
269                                 }
270
271                         }
272                 } catch (Exception e) {
273                         e.printStackTrace();
274                 }
275
276         }
277
278         public void createMirrorMaker(MirrorMaker newMirrorMaker) {
279                 boolean exists = false;
280                 if (mirrorMakers != null) {
281                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
282                         for (int i = 0; i < mirrorMakersCount; i++) {
283                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
284                                 if (mm.name.equals(newMirrorMaker.name)) {
285                                         exists = true;
286                                         logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
287                                         return;
288                                 }
289                         }
290                 }
291                 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
292                 if (exists == false && mirrorMakers != null) {
293                         mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
294                 } else if (exists == false && mirrorMakers == null) {
295                         mirrorMakers = new ListMirrorMaker();
296                         ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
297                         list = new ArrayList<MirrorMaker>();
298                         list.add(newMirrorMaker);
299                         mirrorMakers.setListMirrorMaker(list);
300                 }
301                 checkPropertiesFile(newMirrorMaker, "consumer", true);
302                 checkPropertiesFile(newMirrorMaker, "producer", true);
303
304                 Gson g = new Gson();
305                 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
306                 OutputStream out = null;
307                 try {
308                         out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
309                         mirrorMakerProperties.store(out, "");
310                 } catch (IOException ex) {
311                         ex.printStackTrace();
312                 } finally {
313                         if (out != null) {
314                                 try {
315                                         out.close();
316                                 } catch (IOException e) {
317                                         e.printStackTrace();
318                                 }
319                         }
320                 }
321         }
322
323         private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
324                 boolean exists = false;
325                 if (mirrorMakers != null) {
326                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
327                         for (int i = 0; i < mirrorMakersCount; i++) {
328                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
329                                 if (mm.name.equals(newMirrorMaker.name)) {
330                                         exists = true;
331                                         if (null != newMirrorMaker.getConsumer()) {
332                                                 mm.setConsumer(newMirrorMaker.getConsumer());
333                                         }
334                                         if (null != newMirrorMaker.getProducer()) {
335                                                 mm.setProducer(newMirrorMaker.getProducer());
336                                         }
337                                         if (newMirrorMaker.getNumStreams() >= 1) {
338                                                 mm.setNumStreams(newMirrorMaker.getNumStreams());
339                                         }
340
341                                         mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
342
343                                         mirrorMakers.getListMirrorMaker().set(i, mm);
344                                         newMirrorMaker = mm;
345                                         logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
346                                 }
347                         }
348                 }
349                 if (exists) {
350                         checkPropertiesFile(newMirrorMaker, "consumer", true);
351                         checkPropertiesFile(newMirrorMaker, "producer", true);
352
353                         Gson g = new Gson();
354                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
355                         OutputStream out = null;
356                         try {
357                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
358                                 mirrorMakerProperties.store(out, "");
359                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
360                                 try {
361                                         Thread.sleep(1000);
362                                 } catch (InterruptedException e) {
363                                 }
364                         } catch (IOException ex) {
365                                 ex.printStackTrace();
366                         } finally {
367                                 if (out != null) {
368                                         try {
369                                                 out.close();
370                                         } catch (IOException e) {
371                                                 e.printStackTrace();
372                                         }
373                                 }
374                         }
375                 } else {
376                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
377                 }
378         }
379
380         private void updateWhiteList(MirrorMaker newMirrorMaker) {
381                 boolean exists = false;
382                 if (mirrorMakers != null) {
383                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
384                         for (int i = 0; i < mirrorMakersCount; i++) {
385                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
386                                 if (mm.name.equals(newMirrorMaker.name)) {
387                                         exists = true;
388                                         mm.setWhitelist(newMirrorMaker.whitelist);
389                                         mirrorMakers.getListMirrorMaker().set(i, mm);
390                                         logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
391                                                         + newMirrorMaker.whitelist);
392                                 }
393                         }
394                 }
395                 if (exists) {
396                         Gson g = new Gson();
397                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
398                         OutputStream out = null;
399                         try {
400                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
401                                 mirrorMakerProperties.store(out, "");
402                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
403                                 try {
404                                         Thread.sleep(1000);
405                                 } catch (InterruptedException e) {
406                                 }
407                         } catch (IOException ex) {
408                                 ex.printStackTrace();
409                         } finally {
410                                 if (out != null) {
411                                         try {
412                                                 out.close();
413                                         } catch (IOException e) {
414                                                 e.printStackTrace();
415                                         }
416                                 }
417                         }
418                 } else {
419                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
420                 }
421         }
422
423         private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
424                 boolean exists = false;
425                 if (mirrorMakers != null) {
426                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
427                         for (int i = 0; i < mirrorMakersCount; i++) {
428                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
429                                 if (mm.name.equals(newMirrorMaker.name)) {
430                                         exists = true;
431                                         mirrorMakers.getListMirrorMaker().remove(i);
432                                         logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
433                                         i = mirrorMakersCount;
434                                 }
435                         }
436                 }
437                 if (exists) {
438                         try {
439                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
440                                 File file = new File(path);
441                                 file.delete();
442                         } catch (Exception ex) {
443                         }
444                         try {
445                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
446                                 File file = new File(path);
447                                 file.delete();
448                         } catch (Exception ex) {
449                         }
450                         Gson g = new Gson();
451                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
452                         OutputStream out = null;
453                         try {
454                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
455                                 mirrorMakerProperties.store(out, "");
456                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
457                         } catch (IOException ex) {
458                                 ex.printStackTrace();
459                         } finally {
460                                 if (out != null) {
461                                         try {
462                                                 out.close();
463                                         } catch (IOException e) {
464                                                 e.printStackTrace();
465                                         }
466                                 }
467                         }
468                 } else {
469                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
470                 }
471         }
472
473         private void loadProperties() {
474                 InputStream input = null;
475                 try {
476
477                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
478                         mirrorMakerProperties.load(input);
479                         Gson g = new Gson();
480                         if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
481                                 this.mirrorMakers = new ListMirrorMaker();
482                                 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
483                                 list = new ArrayList<MirrorMaker>();
484                                 this.mirrorMakers.setListMirrorMaker(list);
485                         } else {
486                                 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
487                                                 ListMirrorMaker.class);
488                         }
489
490                         this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
491                         this.topicURL = mirrorMakerProperties.getProperty("topicURL");
492                         this.topicname = mirrorMakerProperties.getProperty("topicname");
493                         this.mechid = mirrorMakerProperties.getProperty("mechid");
494                         this.grepLog = mirrorMakerProperties.getProperty("grepLog");
495
496                         BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
497                         textEncryptor.setPassword(secret);
498                         this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
499                 } catch (Exception ex) {
500                         // ex.printStackTrace();
501                 } finally {
502                         if (input != null) {
503                                 try {
504                                         input.close();
505                                 } catch (IOException e) {
506                                         // e.printStackTrace();
507                                 }
508                         }
509                 }
510
511         }
512
513         public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) throws Exception{
514
515                 Gson g = new Gson();
516
517                 if (object.get("createMirrorMaker") != null) {
518                         logger.info("Received createMirrorMaker request from topic");
519                         CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
520                         createMirrorMaker(m.getCreateMirrorMaker());
521                         checkAgentProcess();
522                         mirrorMakers.setMessageID(m.getMessageID());
523                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
524                         mirrorMakers.setMessageID("");
525                 } else if (object.get("updateMirrorMaker") != null) {
526                         logger.info("Received updateMirrorMaker request from topic");
527                         UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
528                         JSONObject json = new JSONObject(topicMessage);
529                         JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
530                         if (!json2.has("numStreams")) {
531                                 m.getUpdateMirrorMaker().setNumStreams(0);
532                         }
533                         updateMirrorMaker(m.getUpdateMirrorMaker());
534                         checkAgentProcess();
535                         mirrorMakers.setMessageID(m.getMessageID());
536                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
537                         mirrorMakers.setMessageID("");
538                 } else if (object.get("deleteMirrorMaker") != null) {
539                         logger.info("Received deleteMirrorMaker request from topic");
540                         DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
541                         deleteMirrorMaker(m.getDeleteMirrorMaker());
542                         checkAgentProcess();
543                         mirrorMakers.setMessageID(m.getMessageID());
544                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
545                         mirrorMakers.setMessageID("");
546                 } else if (object.get("listAllMirrorMaker") != null) {
547                         logger.info("Received listALLMirrorMaker request from topic");
548                         checkAgentProcess();
549                         mirrorMakers.setMessageID((String) object.get("messageID"));
550                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
551                         mirrorMakers.setMessageID("");
552                 } else if (object.get("updateWhiteList") != null) {
553                         logger.info("Received updateWhiteList request from topic");
554                         UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
555                         updateWhiteList(m.getUpdateWhiteList());
556                         checkAgentProcess();
557                         mirrorMakers.setMessageID(m.getMessageID());
558                         topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers));
559                         mirrorMakers.setMessageID("");
560                 } else if (object.get("listMirrorMaker") != null) {
561                         logger.info("Received listMirrorMaker from topic, skipping messages");
562                 } else {
563                         logger.info("Received unknown request from topic");
564                 }
565
566         }
567 }