Merge "changes for kafka upgrade"
[dmaap/messagerouter/mirroragent.git] / src / main / java / com / att / nsa / dmaapMMAgent / utils / MirrorMakerProcessHandler.java
index fde1145..215323e 100644 (file)
@@ -19,7 +19,6 @@
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
-
 package com.att.nsa.dmaapMMAgent.utils;
 
 import java.io.BufferedReader;
@@ -28,13 +27,12 @@ import java.io.InputStreamReader;
 
 import org.apache.log4j.Logger;
 
-import com.att.nsa.dmaapMMAgent.MirrorMakerAgent;
-
-
-public class MirrorMakerProcessHandler {
+public class MirrorMakerProcessHandler {/*
        static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class);
+       static String mmagenthome = System.getProperty("MMAGENTHOME");
 
-       public static boolean checkMirrorMakerProcess(String agentname) {
+       public static boolean checkMirrorMakerProcess(String agentname, boolean enablelogCheck, String grepLog) throws Exception {
+               String line,linelog;
                try {
                        Runtime rt = Runtime.getRuntime();
                        Process mmprocess = null;
@@ -45,22 +43,61 @@ public class MirrorMakerProcessHandler {
                                                + "~%' and caption='java.exe'\"";
                                mmprocess = rt.exec(args);
                        } else {
-                               String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+                               //String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+                               
+                               String args[] = { "/bin/sh", "-c", "ps -ef | grep `ps -ef |grep agentname=" + agentname + "~ | egrep -v 'grep|java' | awk '{print $2}' `| egrep -v '/bin/sh|grep' "};
+                               logger.info("CheckMM process->"+args[2]);
                                mmprocess = rt.exec(args);
                        }
 
                        InputStream is = mmprocess.getInputStream();
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
-                       String line;
+                        
                        while ((line = br.readLine()) != null) {
-                               // System.out.println(line);
-                               if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
-                                       return true;
-                               }
+                               System.out.println(line);
+               //              if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
+                                       
+                                       //If enablelogCheck Check MirrorMaker log for errors and restart mirrormaker
+                                       if(enablelogCheck) {
+                                               logger.info("Check if MM log contains any errors");
+                                               String args2[];
+                                               args2 = new String[] { "/bin/sh", "-c", "grep -i ERROR "+ mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+                                               if(null!=grepLog && !grepLog.isEmpty()) 
+                                               {
+                                                        args2 = new String[]{ "/bin/sh", "-c", grepLog +" " +  mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+                                               }
+                                               logger.info("Grep log args-- "+args2[2]);                               
+                                               mmprocess = rt.exec(args2);
+                                               InputStream islog = mmprocess.getInputStream();
+                                               InputStreamReader isrlog = new InputStreamReader(islog);
+                                               BufferedReader brlog = new BufferedReader(isrlog);
+                                                       
+                                                       while ((linelog = brlog.readLine()) != null) 
+                                                       {
+                                                               logger.info("Error from MM log--"+linelog);
+                                                                       
+                                                                               if (linelog.toLowerCase().contains("ERROR".toLowerCase()) ||
+                                                                                               linelog.toLowerCase().contains("Issue".toLowerCase())   ) 
+                                                                               {
+                                                                                       logger.info("MM log contains error Stop MM and restart");
+                                                                                       stopMirrorMaker(agentname);
+                                                                                       isrlog.close();
+                                                                                       brlog.close();
+                                                                                       return false;
+                                                                               } 
+                                                                               
+                                                                               
+                                                       }
+                                                       isrlog.close();
+                                                       brlog.close();
+                                       }
+                                               
+                                               return true;                                    
+                       //      }
                        }
                } catch (Exception e) {
-                       logger.error("Error at checkMirrorMakerProcess method:" + e.getMessage());
+                       e.printStackTrace();
                }
                return false;
        }
@@ -75,8 +112,14 @@ public class MirrorMakerProcessHandler {
                                                + "~%' and caption='java.exe'\" call terminate";
                                killprocess = rt.exec(args);
                        } else {
+                               //String args[] = { "/bin/sh", "-c",
+                                       //      "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+                               
+                               //String args[] = { "/bin/sh", "-c",
+                               //              "kill -9 `ps -ef |grep agentname=" + agentname + "~| egrep -v 'grep|java' | awk '{print $2}'` | egrep -v '/bin/sh|grep'"};
                                String args[] = { "/bin/sh", "-c",
-                                               "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+                                               "for i in `ps -ef |grep agentname="+ agentname + "~ | egrep -v 'grep|java' | awk '{print $2}'`;do kill -9 `ps -eaf | grep $i | egrep -v '/bin/sh|grep' | awk '{print $2}'` ;done"};
+                               logger.info ("Stop MM ->"+args[2]);                             
                                // args = "kill $(ps -ef |grep java |grep agentname=" +
                                // agentname + "~| awk '{print $2}')";
                                killprocess = rt.exec(args);
@@ -92,20 +135,20 @@ public class MirrorMakerProcessHandler {
 
                        logger.info("Mirror Maker " + agentname + " Stopped");
                } catch (Exception e) {
-                       logger.error("Error at stopMirrorMaker method:" + e.getMessage());
+                       e.printStackTrace();
                }
 
        }
 
        public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig,
-                       String producerConfig, String whitelist) {
+                       String producerConfig, int numStreams,  String whitelist) {
                try {
                        Runtime rt = Runtime.getRuntime();
 
                        if (System.getProperty("os.name").contains("Windows")) {
                                String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName
                                                + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config "
-                                               + producerConfig + " --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
+                                               + producerConfig +" --num.streams " + numStreams + "  --abort.on.send.failure true" +" --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
                                                + "_MMaker.log";
                                final Process process = rt.exec(args);
                                new Thread() {
@@ -120,7 +163,7 @@ public class MirrorMakerProcessHandler {
                                                                // System.out.println(line);
                                                        }
                                                } catch (Exception anExc) {
-                                                       logger.error("Error at startMirrorMaker method:" + anExc.getMessage());
+                                                       anExc.printStackTrace();
                                                }
                                        }
                                }.start();
@@ -128,7 +171,7 @@ public class MirrorMakerProcessHandler {
                                String args[] = { "/bin/sh", "-c",
                                                kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName
                                                                + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig
-                                                               + " --producer.config " + producerConfig + " --whitelist '" + whitelist + "' >"
+                                                               + " --producer.config " + producerConfig + " --num.streams " + numStreams + "  --abort.on.send.failure true" + " --whitelist '" + whitelist + "' >"
                                                                + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" };
                                final Process process = rt.exec(args);
                                new Thread() {
@@ -142,7 +185,7 @@ public class MirrorMakerProcessHandler {
                                                                // System.out.println(line);
                                                        }
                                                } catch (Exception anExc) {
-                                                       logger.error("Exception at startMirrorMaker method else part run method:" + anExc.getMessage());
+                                                       anExc.printStackTrace();
                                                }
                                        }
                                }.start();
@@ -154,4 +197,4 @@ public class MirrorMakerProcessHandler {
                        e.printStackTrace();
                }
        }
-}
+*/}