Merge "changes for kafka upgrade"
authorRam Koya <rk541m@att.com>
Wed, 20 Jun 2018 21:24:37 +0000 (21:24 +0000)
committerGerrit Code Review <gerrit@onap.org>
Wed, 20 Jun 2018 21:24:37 +0000 (21:24 +0000)
1  2 
src/main/java/com/att/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java
src/main/java/com/att/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java

   *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
   *  
   *******************************************************************************/
-  
  package com.att.nsa.dmaapMMAgent.dao;
  
  public class CreateMirrorMaker {
        String messageID;
 -      MirrorMaker createMirrorMaker;
 +      MirrorMaker MirrorMaker;
  
        public MirrorMaker getCreateMirrorMaker() {
 -              return createMirrorMaker;
 +              return MirrorMaker;
        }
  
        public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) {
 -              this.createMirrorMaker = createMirrorMaker;
 +              this.MirrorMaker = createMirrorMaker;
        }
  
        public String getMessageID() {
@@@ -19,7 -19,6 +19,6 @@@
   *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
   *  
   *******************************************************************************/
  package com.att.nsa.dmaapMMAgent.utils;
  
  import java.io.BufferedReader;
@@@ -28,13 -27,12 +27,12 @@@ import java.io.InputStreamReader
  
  import org.apache.log4j.Logger;
  
- import com.att.nsa.dmaapMMAgent.MirrorMakerAgent;
- public class MirrorMakerProcessHandler {
+ public class MirrorMakerProcessHandler {/*
        static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class);
+       static String mmagenthome = System.getProperty("MMAGENTHOME");
  
-       public static boolean checkMirrorMakerProcess(String agentname) {
+       public static boolean checkMirrorMakerProcess(String agentname, boolean enablelogCheck, String grepLog) throws Exception {
+               String line,linelog;
                try {
                        Runtime rt = Runtime.getRuntime();
                        Process mmprocess = null;
                                                + "~%' and caption='java.exe'\"";
                                mmprocess = rt.exec(args);
                        } else {
-                               String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+                               //String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+                               
+                               String args[] = { "/bin/sh", "-c", "ps -ef | grep `ps -ef |grep agentname=" + agentname + "~ | egrep -v 'grep|java' | awk '{print $2}' `| egrep -v '/bin/sh|grep' "};
+                               logger.info("CheckMM process->"+args[2]);
                                mmprocess = rt.exec(args);
                        }
  
                        InputStream is = mmprocess.getInputStream();
                        InputStreamReader isr = new InputStreamReader(is);
                        BufferedReader br = new BufferedReader(isr);
-                       String line;
+                        
                        while ((line = br.readLine()) != null) {
-                               // System.out.println(line);
-                               if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
-                                       return true;
-                               }
+                               System.out.println(line);
+               //              if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
+                                       
+                                       //If enablelogCheck Check MirrorMaker log for errors and restart mirrormaker
+                                       if(enablelogCheck) {
+                                               logger.info("Check if MM log contains any errors");
+                                               String args2[];
+                                               args2 = new String[] { "/bin/sh", "-c", "grep -i ERROR "+ mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+                                               if(null!=grepLog && !grepLog.isEmpty()) 
+                                               {
+                                                        args2 = new String[]{ "/bin/sh", "-c", grepLog +" " +  mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+                                               }
+                                               logger.info("Grep log args-- "+args2[2]);                               
+                                               mmprocess = rt.exec(args2);
+                                               InputStream islog = mmprocess.getInputStream();
+                                               InputStreamReader isrlog = new InputStreamReader(islog);
+                                               BufferedReader brlog = new BufferedReader(isrlog);
+                                                       
+                                                       while ((linelog = brlog.readLine()) != null) 
+                                                       {
+                                                               logger.info("Error from MM log--"+linelog);
+                                                                       
+                                                                               if (linelog.toLowerCase().contains("ERROR".toLowerCase()) ||
+                                                                                               linelog.toLowerCase().contains("Issue".toLowerCase())   ) 
+                                                                               {
+                                                                                       logger.info("MM log contains error Stop MM and restart");
+                                                                                       stopMirrorMaker(agentname);
+                                                                                       isrlog.close();
+                                                                                       brlog.close();
+                                                                                       return false;
+                                                                               } 
+                                                                               
+                                                                               
+                                                       }
+                                                       isrlog.close();
+                                                       brlog.close();
+                                       }
+                                               
+                                               return true;                                    
+                       //      }
                        }
                } catch (Exception e) {
-                       logger.error("Error at checkMirrorMakerProcess method:" + e.getMessage());
+                       e.printStackTrace();
                }
                return false;
        }
                                                + "~%' and caption='java.exe'\" call terminate";
                                killprocess = rt.exec(args);
                        } else {
+                               //String args[] = { "/bin/sh", "-c",
+                                       //      "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+                               
+                               //String args[] = { "/bin/sh", "-c",
+                               //              "kill -9 `ps -ef |grep agentname=" + agentname + "~| egrep -v 'grep|java' | awk '{print $2}'` | egrep -v '/bin/sh|grep'"};
                                String args[] = { "/bin/sh", "-c",
-                                               "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+                                               "for i in `ps -ef |grep agentname="+ agentname + "~ | egrep -v 'grep|java' | awk '{print $2}'`;do kill -9 `ps -eaf | grep $i | egrep -v '/bin/sh|grep' | awk '{print $2}'` ;done"};
+                               logger.info ("Stop MM ->"+args[2]);                             
                                // args = "kill $(ps -ef |grep java |grep agentname=" +
                                // agentname + "~| awk '{print $2}')";
                                killprocess = rt.exec(args);
  
                        logger.info("Mirror Maker " + agentname + " Stopped");
                } catch (Exception e) {
-                       logger.error("Error at stopMirrorMaker method:" + e.getMessage());
+                       e.printStackTrace();
                }
  
        }
  
        public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig,
-                       String producerConfig, String whitelist) {
+                       String producerConfig, int numStreams,  String whitelist) {
                try {
                        Runtime rt = Runtime.getRuntime();
  
                        if (System.getProperty("os.name").contains("Windows")) {
                                String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName
                                                + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config "
-                                               + producerConfig + " --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
+                                               + producerConfig +" --num.streams " + numStreams + "  --abort.on.send.failure true" +" --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
                                                + "_MMaker.log";
                                final Process process = rt.exec(args);
                                new Thread() {
 +                                      @Override
                                        public void run() {
                                                try {
                                                        InputStream is = process.getInputStream();
                                                                // System.out.println(line);
                                                        }
                                                } catch (Exception anExc) {
-                                                       logger.error("Error at startMirrorMaker method:" + anExc.getMessage());
+                                                       anExc.printStackTrace();
                                                }
                                        }
                                }.start();
                                String args[] = { "/bin/sh", "-c",
                                                kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName
                                                                + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig
-                                                               + " --producer.config " + producerConfig + " --whitelist '" + whitelist + "' >"
+                                                               + " --producer.config " + producerConfig + " --num.streams " + numStreams + "  --abort.on.send.failure true" + " --whitelist '" + whitelist + "' >"
                                                                + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" };
                                final Process process = rt.exec(args);
                                new Thread() {
                                                                // System.out.println(line);
                                                        }
                                                } catch (Exception anExc) {
-                                                       logger.error("Exception at startMirrorMaker method else part run method:" + anExc.getMessage());
+                                                       anExc.printStackTrace();
                                                }
                                        }
                                }.start();
                        e.printStackTrace();
                }
        }
- }
*/}