Merge "Updated the testcases"
[dmaap/messagerouter/mirroragent.git] / src / main / java / com / att / nsa / dmaapMMAgent / MirrorMakerAgent.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22
23 package com.att.nsa.dmaapMMAgent;
24
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.InputStream;
30 import java.io.OutputStream;
31 import java.util.ArrayList;
32 import java.util.Properties;
33
34 import org.apache.log4j.Level;
35 import org.apache.log4j.Logger;
36 import org.jasypt.util.text.BasicTextEncryptor;
37
38 import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker;
39 import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker;
40 import com.att.nsa.dmaapMMAgent.dao.ListMirrorMaker;
41 import com.att.nsa.dmaapMMAgent.dao.MirrorMaker;
42 import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker;
43 import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList;
44 import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler;
45 import com.google.gson.Gson;
46 import com.google.gson.internal.LinkedTreeMap;
47
48 public class MirrorMakerAgent {
49         static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
50         Properties mirrorMakerProperties = new Properties();
51         ListMirrorMaker mirrorMakers = null;
52         String mmagenthome = "";
53         String kafkahome = "";
54         String topicURL = "";
55         String topicname = "";
56         String mechid = "";
57         String password = "";
58         TopicUtil topicUtil = new TopicUtil();
59         public boolean exitLoop = false;
60         private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
61
62         public static void main(String[] args) {
63                 if (args != null && args.length == 2) {
64                         if (args[0].equals("-encrypt")) {
65                                 BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
66                                 textEncryptor.setPassword(secret);
67                                 String plainText = textEncryptor.encrypt(args[1]);
68                                 System.out.println("Encrypted Password is :" + plainText);
69                                 return;
70                         }
71                 } else if (args != null && args.length > 0) {
72                         System.out.println(
73                                         "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
74                         return;
75                 }
76                 MirrorMakerAgent agent = new MirrorMakerAgent();
77                 if (agent.checkStartup()) {
78                         logger.info("mmagent started, loading properties");
79                         agent.checkAgentProcess();
80                         agent.readAgentTopic();
81                 } else {
82                         System.out.println(
83                                         "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
84                 }
85         }
86
87         private boolean checkStartup() {
88                 FileInputStream input = null;
89                 try {
90                         this.mmagenthome = System.getProperty("MMAGENTHOME");
91                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
92                         logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
93                 } catch (IOException ex) {
94                         logger.error(
95                                         mmagenthome + "/etc/mmagent.config not found.  Set -DMMAGENTHOME and check the config file" + ex);
96                         return false;
97                 } finally {
98                         if (input != null) {
99                                 try {
100                                         input.close();
101                                 } catch (IOException e) {
102                                         logger.error(" IOException occers " + e);
103                                 }
104                         }
105                 }
106                 loadProperties();
107                 input = null;
108                 try {
109                         /*
110                          * input = new FileInputStream(kafkahome +
111                          * "/bin/kafka-run-class.sh");
112                          */
113                         if (false) {
114                                 throw new IOException();
115                         }
116                         logger.info("kakahome is set :" + kafkahome);
117                 } catch (IOException ex) {
118                         logger.error(kafkahome + "/bin/kafka-run-class.sh not found.  Make sure kafka home is set correctly" + ex);
119                         return false;
120                 } finally {
121                         if (input != null) {
122                                 try {
123                                         input.close();
124                                 } catch (IOException e) {
125                                         logger.error("IOException" + e);
126                                 }
127                         }
128                 }
129                 String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}");
130                 if (response.startsWith("ERROR:")) {
131                         logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
132                                         + this.topicURL + " Error is:  " + response);
133                         return false;
134                 }
135                 logger.info("Published to Topic :" + this.topicname + " Successfully");
136                 response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response);
137                 if (response != null && response.startsWith("ERROR:")) {
138                         logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
139                                         + this.topicURL + " Error is:  " + response);
140                         return false;
141                 }
142                 logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
143                 return true;
144         }
145
146         private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) {
147                 InputStream input = null;
148                 OutputStream out = null;
149                 try {
150                         if (refresh) {
151                                 throw new IOException();
152                         }
153                         input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
154                 } catch (IOException ex) {
155                         logger.error(" IOException will be handled " + ex);
156                         try {
157                                 input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
158                                 Properties prop = new Properties();
159                                 prop.load(input);
160                                 if (propName.equals("consumer")) {
161                                         prop.setProperty("group.id", agentName);
162                                         prop.setProperty("zookeeper.connect", info);
163                                 } else {
164                                         prop.setProperty("metadata.broker.list", info);
165                                 }
166                                 out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties");
167                                 prop.store(out, "");
168
169                         } catch (Exception e) {
170                                 logger.error("Exception at checkPropertiesFile " + e);
171                         }
172                 } finally {
173                         if (input != null) {
174                                 try {
175                                         input.close();
176                                 } catch (IOException e) {
177                                         logger.error("Exception occurred is " + e);
178                                 }
179                         }
180                         if (out != null) {
181                                 try {
182                                         out.close();
183                                 } catch (IOException e) {
184                                         e.printStackTrace();
185                                         logger.error("Exception is : " + e);
186                                 }
187                         }
188                 }
189         }
190
191         private void checkAgentProcess() {
192                 logger.info("Checking MirrorMaker Process");
193                 if (mirrorMakers != null) {
194                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
195                         for (int i = 0; i < mirrorMakersCount; i++) {
196                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
197                                 if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) {
198                                         checkPropertiesFile(mm.name, "consumer", mm.consumer, false);
199                                         checkPropertiesFile(mm.name, "producer", mm.producer, false);
200
201                                         if (mm.whitelist != null && !mm.whitelist.equals("")) {
202                                                 logger.info(
203                                                                 "MirrorMaker " + mm.name + " is not running, restarting.  Check Logs for more Details");
204                                                 MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
205                                                                 mmagenthome + "/etc/" + mm.name + "consumer.properties",
206                                                                 mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist);
207                                                 mm.setStatus("RESTARTING");
208
209                                         } else {
210                                                 logger.info("MirrorMaker " + mm.name + " is STOPPED");
211                                                 mm.setStatus("STOPPED");
212                                         }
213                                         try {
214                                                 Thread.sleep(1000);
215                                         } catch (InterruptedException e) {
216                                                 Thread.currentThread().interrupt();
217                                         }
218                                         mirrorMakers.getListMirrorMaker().set(i, mm);
219                                 } else {
220                                         logger.info("MirrorMaker " + mm.name + " is running");
221                                         mm.setStatus("RUNNING");
222                                         mirrorMakers.getListMirrorMaker().set(i, mm);
223                                 }
224                         }
225                 }
226                 // Gson g = new Gson();
227                 // System.out.println(g.toJson(mirrorMakers));
228         }
229
230         public void readAgentTopic() {
231                 try {
232                         int connectionattempt = 0;
233                         while (true) {
234                                 logger.info("--------------------------------");
235                                 logger.info("Waiting for Messages for 60 secs");
236                                 String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password);
237                                 Gson g = new Gson();
238                                 LinkedTreeMap<?, ?> object = null;
239                                 if (topicMessage != null) {
240                                         try {
241                                                 object = g.fromJson(topicMessage, LinkedTreeMap.class);
242
243                                                 // Cast the 1st item (since limit=1 and see the type of
244                                                 // object
245                                                 readAgent(object, topicMessage);
246                                         } catch (Exception ex) {
247                                                 connectionattempt++;
248                                                 if (connectionattempt > 5) {
249                                                         logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex);
250                                                         return;
251                                                 }
252                                                 logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
253                                                                 + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
254                                                 Thread.sleep(60000);
255                                         }
256                                 } else {
257                                         // Check all MirrorMaker every min
258                                         connectionattempt = 0;
259                                         checkAgentProcess();
260                                 }
261                                 if (exitLoop) {
262                                         break;
263                                 }
264                         }
265                 } catch (Exception e) {
266                         logger.error("Exception at readAgentTopic : " + e);
267                 }
268
269         }
270
271         public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) {
272
273                 Gson g = new Gson();
274
275                 if (object.get("createMirrorMaker") != null) {
276                         logger.info("Received createMirrorMaker request from topic");
277                         CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
278                         createMirrorMaker(m.getCreateMirrorMaker());
279                         checkAgentProcess();
280                         mirrorMakers.setMessageID(m.getMessageID());
281                         topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
282                         mirrorMakers.setMessageID("");
283                 } else if (object.get("updateMirrorMaker") != null) {
284                         logger.info("Received updateMirrorMaker request from topic");
285                         UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
286                         updateMirrorMaker(m.getUpdateMirrorMaker());
287                         checkAgentProcess();
288                         mirrorMakers.setMessageID(m.getMessageID());
289                         topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
290                         mirrorMakers.setMessageID("");
291                 } else if (object.get("deleteMirrorMaker") != null) {
292                         logger.info("Received deleteMirrorMaker request from topic");
293                         DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
294                         deleteMirrorMaker(m.getDeleteMirrorMaker());
295                         checkAgentProcess();
296                         mirrorMakers.setMessageID(m.getMessageID());
297                         topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
298                         mirrorMakers.setMessageID("");
299                 } else if (object.get("listAllMirrorMaker") != null) {
300                         logger.info("Received listALLMirrorMaker request from topic");
301                         checkAgentProcess();
302                         mirrorMakers.setMessageID((String) object.get("messageID"));
303                         topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
304                 } else if (object.get("updateWhiteList") != null) {
305                         logger.info("Received updateWhiteList request from topic");
306                         UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
307                         updateWhiteList(m.getUpdateWhiteList());
308                         checkAgentProcess();
309                         mirrorMakers.setMessageID(m.getMessageID());
310                         topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers));
311                         mirrorMakers.setMessageID("");
312                 } else if (object.get("listMirrorMaker") != null) {
313                         logger.info("Received listMirrorMaker from topic, skipping messages");
314                 } else {
315                         logger.info("Received unknown request from topic");
316                 }
317
318         }
319
320         protected void createMirrorMaker(MirrorMaker newMirrorMaker) {
321                 boolean exists = false;
322                 if (mirrorMakers != null) {
323                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
324                         for (int i = 0; i < mirrorMakersCount; i++) {
325                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
326                                 if (mm.name.equals(newMirrorMaker.name)) {
327                                         exists = true;
328                                         logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
329                                         return;
330                                 }
331                         }
332                 }
333                 logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
334                 if (exists == false && mirrorMakers != null) {
335                         mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
336                 } else if (exists == false && mirrorMakers == null) {
337                         mirrorMakers = new ListMirrorMaker();
338                         ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
339                         list = new ArrayList();
340                         list.add(newMirrorMaker);
341                         mirrorMakers.setListMirrorMaker(list);
342                 }
343                 checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
344                 checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
345
346                 Gson g = new Gson();
347                 mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
348                 OutputStream out = null;
349                 try {
350                         out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
351                         mirrorMakerProperties.store(out, "");
352                 } catch (IOException ex) {
353                         logger.error(" IOException Occered " + ex);
354                 } finally {
355                         if (out != null) {
356                                 try {
357                                         out.close();
358                                 } catch (IOException e) {
359                                         e.printStackTrace();
360                                 }
361                         }
362                 }
363         }
364
365         private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
366                 boolean exists = false;
367                 if (mirrorMakers != null) {
368                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
369                         for (int i = 0; i < mirrorMakersCount; i++) {
370                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
371                                 if (mm.name.equals(newMirrorMaker.name)) {
372                                         exists = true;
373                                         mm.setConsumer(newMirrorMaker.getConsumer());
374                                         mm.setProducer(newMirrorMaker.getProducer());
375                                         mirrorMakers.getListMirrorMaker().set(i, mm);
376                                         logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
377                                 }
378                         }
379                 }
380                 if (exists) {
381                         checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true);
382                         checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true);
383
384                         Gson g = new Gson();
385                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
386                         OutputStream out = null;
387                         try {
388                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
389                                 mirrorMakerProperties.store(out, "");
390                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
391                                 try {
392                                         Thread.sleep(1000);
393                                 } catch (InterruptedException e) {
394                                         logger.log(Level.WARN, "Interrupted!", e);
395                                         Thread.currentThread().interrupt();
396                                 }
397                         } catch (IOException ex) {
398                                 logger.error(" IOException Occered " + ex);
399                         } finally {
400                                 if (out != null) {
401                                         try {
402                                                 out.close();
403                                         } catch (IOException e) {
404                                                 logger.error(" IOException Occered " + e);
405                                         }
406                                 }
407                         }
408                 } else {
409                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
410                 }
411         }
412
413         private void updateWhiteList(MirrorMaker newMirrorMaker) {
414                 boolean exists = false;
415                 if (mirrorMakers != null) {
416                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
417                         for (int i = 0; i < mirrorMakersCount; i++) {
418                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
419                                 if (mm.name.equals(newMirrorMaker.name)) {
420                                         exists = true;
421                                         mm.setWhitelist(newMirrorMaker.whitelist);
422                                         mirrorMakers.getListMirrorMaker().set(i, mm);
423                                         logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
424                                                         + newMirrorMaker.whitelist);
425                                 }
426                         }
427                 }
428                 if (exists) {
429                         Gson g = new Gson();
430                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
431                         OutputStream out = null;
432                         try {
433                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
434                                 mirrorMakerProperties.store(out, "");
435                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
436                                 try {
437                                         Thread.sleep(1000);
438                                 } catch (InterruptedException e) {
439                                         logger.log(Level.WARN, "Interrupted!", e);
440                                         Thread.currentThread().interrupt();
441                                 }
442                         } catch (IOException ex) {
443                                 logger.error("Exception at updateWhiteList : " + ex);
444                         } finally {
445                                 if (out != null) {
446                                         try {
447                                                 out.close();
448                                         } catch (IOException e) {
449                                                 e.printStackTrace();
450                                                 logger.error("IOException occered " + e);
451                                         }
452                                 }
453                         }
454                 } else {
455                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
456                 }
457         }
458
459         private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
460                 boolean exists = false;
461                 if (mirrorMakers != null) {
462                         int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
463                         for (int i = 0; i < mirrorMakersCount; i++) {
464                                 MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
465                                 if (mm.name.equals(newMirrorMaker.name)) {
466                                         exists = true;
467                                         mirrorMakers.getListMirrorMaker().remove(i);
468                                         logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
469                                         i = mirrorMakersCount;
470                                 }
471                         }
472                 }
473                 if (exists) {
474                         try {
475                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
476                                 File file = new File(path);
477                                 file.delete();
478                         } catch (Exception ex) {
479                         }
480                         try {
481                                 String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
482                                 File file = new File(path);
483                                 file.delete();
484                         } catch (Exception ex) {
485                         }
486                         Gson g = new Gson();
487                         mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
488                         OutputStream out = null;
489                         try {
490                                 out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
491                                 mirrorMakerProperties.store(out, "");
492                                 MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
493                         } catch (IOException ex) {
494                                 ex.printStackTrace();
495                         } finally {
496                                 if (out != null) {
497                                         try {
498                                                 out.close();
499                                         } catch (IOException e) {
500                                                 e.printStackTrace();
501                                         }
502                                 }
503                         }
504                 } else {
505                         logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
506                 }
507         }
508
509         private void loadProperties() {
510                 InputStream input = null;
511                 try {
512
513                         input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
514                         mirrorMakerProperties.load(input);
515                         Gson g = new Gson();
516                         if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
517                                 this.mirrorMakers = new ListMirrorMaker();
518                                 ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
519                                 list = new ArrayList<MirrorMaker>();
520                                 this.mirrorMakers.setListMirrorMaker(list);
521                         } else {
522                                 this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
523                                                 ListMirrorMaker.class);
524                         }
525
526                         this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
527                         this.topicURL = mirrorMakerProperties.getProperty("topicURL");
528                         this.topicname = mirrorMakerProperties.getProperty("topicname");
529                         this.mechid = mirrorMakerProperties.getProperty("mechid");
530
531                         BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
532                         textEncryptor.setPassword(secret);
533                         // this.password =
534                         // textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
535                         this.password = mirrorMakerProperties.getProperty("password");
536                 } catch (IOException ex) {
537                         // ex.printStackTrace();
538                 } finally {
539                         if (input != null) {
540                                 try {
541                                         input.close();
542                                 } catch (IOException e) {
543                                         // e.printStackTrace();
544                                 }
545                         }
546                 }
547
548         }
549 }