X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fmessagerouter%2Fmirroragent.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2FdmaapMMAgent%2FMirrorMakerAgent.java;fp=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2FdmaapMMAgent%2FMirrorMakerAgent.java;h=977caaee2ffb707f47729fbcb8aafd998d82bb33;hp=71bd85cdea9bfd7ba60e576d59ea887f8b51ef72;hb=1ee517bedb7c1634e37c1124676f001b95840985;hpb=669941433fe0d154e11161e0a1095518a6199b06 diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java index 71bd85c..977caae 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent; import java.io.BufferedReader; @@ -35,10 +34,9 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Properties; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.jasypt.util.text.BasicTextEncryptor; +//import org.json.JSONObject; +//import org.apache.log4j.Logger; +//import org.jasypt.util.text.BasicTextEncryptor; import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker; import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker; @@ -48,6 +46,7 @@ import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker; import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList; import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler; import com.google.gson.Gson; +import com.google.gson.JsonArray; import com.google.gson.internal.LinkedTreeMap; import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; @@ -62,6 +61,7 @@ public class MirrorMakerAgent {/* String topicname = ""; String mechid = ""; String password = ""; + String grepLog = ""; private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -81,7 +81,12 @@ public class MirrorMakerAgent {/* MirrorMakerAgent agent = new MirrorMakerAgent(); if (agent.checkStartup()) { logger.info("mmagent started, loading properties"); - agent.checkAgentProcess(); + try { + agent.checkAgentProcess(); + } catch (Exception e) { + + e.printStackTrace(); + } agent.readAgentTopic(); } else { System.out.println( @@ -96,14 +101,14 @@ public class MirrorMakerAgent {/* input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); } catch (IOException ex) { - logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); + logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file"); return false; } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error(" IOException occers " + e); + e.printStackTrace(); } } } @@ -111,19 +116,16 @@ public class MirrorMakerAgent {/* input = null; try { input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh"); - if(false) { - throw new IOException(); - } logger.info("kakahome is set :" + kafkahome); } catch (IOException ex) { - logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex); + logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly"); return false; } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("IOException" + e); + e.printStackTrace(); } } } @@ -144,38 +146,41 @@ public class MirrorMakerAgent {/* return true; } - private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) { + private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) { InputStream input = null; OutputStream out = null; try { if (refresh) { throw new IOException(); } - input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties"); } catch (IOException ex) { - logger.error(" IOException will be handled " + ex); try { input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties"); Properties prop = new Properties(); prop.load(input); if (propName.equals("consumer")) { - prop.setProperty("group.id", agentName); - prop.setProperty("zookeeper.connect", info); - } else { - prop.setProperty("metadata.broker.list", info); + prop.setProperty("group.id", mm.name); + + prop.setProperty("bootstrap.servers", mm.consumer); + prop.setProperty("client.id", mm.name + "MM_consumer"); + } else { + prop.setProperty("bootstrap.servers", mm.producer); + prop.setProperty("client.id", mm.name + "MM_producer"); + } - out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties"); prop.store(out, ""); } catch (Exception e) { - logger.error("Exception at checkPropertiesFile " +e); + e.printStackTrace(); } } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("Exception occurred is " +e); + e.printStackTrace(); } } if (out != null) { @@ -183,27 +188,26 @@ public class MirrorMakerAgent {/* out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("Exception is : "+e); } } } } - private void checkAgentProcess() { + private void checkAgentProcess() throws Exception { logger.info("Checking MirrorMaker Process"); if (mirrorMakers != null) { int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); for (int i = 0; i < mirrorMakersCount; i++) { MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); - if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) { - checkPropertiesFile(mm.name, "consumer", mm.consumer, false); - checkPropertiesFile(mm.name, "producer", mm.producer, false); + if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name,mm.enablelogCheck,this.grepLog) == false) { + checkPropertiesFile(mm, "consumer", false); + checkPropertiesFile(mm, "producer", false); if (mm.whitelist != null && !mm.whitelist.equals("")) { logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, mmagenthome + "/etc/" + mm.name + "consumer.properties", - mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist); + mmagenthome + "/etc/" + mm.name + "producer.properties",mm.numStreams, mm.whitelist); mm.setStatus("RESTARTING"); } else { @@ -213,7 +217,6 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } mirrorMakers.getListMirrorMaker().set(i, mm); } else { @@ -248,13 +251,18 @@ public class MirrorMakerAgent {/* response = response + line; } Gson g = new Gson(); + //Get message as JSON Array + JsonArray topicMessage = g.fromJson(response, JsonArray.class); + if (topicMessage.size() != 0) { + return topicMessage.get(0).toString(); + } + // get message as JSON String Array String[] topicMessage = g.fromJson(response, String[].class); if (topicMessage.length != 0) { return topicMessage[0]; } } catch (Exception e) { - logger.error(" Exception Occered " + e); return "ERROR:" + e.getMessage() + " Server Response is:" + response; } return null; @@ -285,7 +293,6 @@ public class MirrorMakerAgent {/* return response; } catch (Exception e) { - logger.error(" Exception Occered " + e); return "ERROR:" + e.getLocalizedMessage(); } } @@ -301,6 +308,12 @@ public class MirrorMakerAgent {/* LinkedTreeMap object = null; if (topicMessage != null) { try { + //Check and parse if String object returned by consumer API + //else use the jsonObject + if( topicMessage.startsWith("\"")) + { + topicMessage = g.fromJson(topicMessage.toString(), String.class); + } object = g.fromJson(topicMessage, LinkedTreeMap.class); // Cast the 1st item (since limit=1 and see the type of @@ -316,6 +329,11 @@ public class MirrorMakerAgent {/* } else if (object.get("updateMirrorMaker") != null) { logger.info("Received updateMirrorMaker request from topic"); UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + JSONObject json = new JSONObject(topicMessage); + JSONObject json2 = (JSONObject) json.get("updateMirrorMaker"); + if(!json2.has("numStreams")){ + m.getUpdateMirrorMaker().setNumStreams(0); + } updateMirrorMaker(m.getUpdateMirrorMaker()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); @@ -351,7 +369,7 @@ public class MirrorMakerAgent {/* } catch (Exception ex) { connectionattempt++; if (connectionattempt > 5) { - logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex); + logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage); return; } logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt @@ -366,12 +384,12 @@ public class MirrorMakerAgent {/* } } catch (Exception e) { - logger.error("Exception at readAgentTopic : " + e); + e.printStackTrace(); } } - protected void createMirrorMaker(MirrorMaker newMirrorMaker) { + private void createMirrorMaker(MirrorMaker newMirrorMaker) { boolean exists = false; if (mirrorMakers != null) { int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); @@ -390,12 +408,12 @@ public class MirrorMakerAgent {/* } else if (exists == false && mirrorMakers == null) { mirrorMakers = new ListMirrorMaker(); ArrayList list = mirrorMakers.getListMirrorMaker(); - list = new ArrayList(); + list = new ArrayList(); list.add(newMirrorMaker); mirrorMakers.setListMirrorMaker(list); } - checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); - checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + checkPropertiesFile(newMirrorMaker, "consumer", true); + checkPropertiesFile(newMirrorMaker, "producer", true); Gson g = new Gson(); mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); @@ -404,7 +422,7 @@ public class MirrorMakerAgent {/* out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); mirrorMakerProperties.store(out, ""); } catch (IOException ex) { - logger.error(" IOException Occered " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { @@ -424,16 +442,30 @@ public class MirrorMakerAgent {/* MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); if (mm.name.equals(newMirrorMaker.name)) { exists = true; - mm.setConsumer(newMirrorMaker.getConsumer()); - mm.setProducer(newMirrorMaker.getProducer()); + if(null!=newMirrorMaker.getConsumer()) + { + mm.setConsumer(newMirrorMaker.getConsumer()); + } + if(null!=newMirrorMaker.getProducer()) + { + mm.setProducer(newMirrorMaker.getProducer()); + } + if(newMirrorMaker.getNumStreams()>=1) + { + mm.setNumStreams(newMirrorMaker.getNumStreams()); + } + + mm.setEnablelogCheck(newMirrorMaker.enablelogCheck); + mirrorMakers.getListMirrorMaker().set(i, mm); + newMirrorMaker=mm; logger.info("Updating MirrorMaker:" + newMirrorMaker.name); } } } if (exists) { - checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); - checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + checkPropertiesFile(newMirrorMaker, "consumer", true); + checkPropertiesFile(newMirrorMaker, "producer", true); Gson g = new Gson(); mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); @@ -445,17 +477,15 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.log(Level.WARN, "Interrupted!", e); - Thread.currentThread().interrupt(); } } catch (IOException ex) { - logger.error(" IOException Occered " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { - logger.error(" IOException Occered " + e); + e.printStackTrace(); } } } @@ -490,18 +520,15 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.log(Level.WARN, "Interrupted!", e); - Thread.currentThread().interrupt(); } } catch (IOException ex) { - logger.error("Exception at updateWhiteList : " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("IOException occered " + e); } } } @@ -581,11 +608,11 @@ public class MirrorMakerAgent {/* this.topicURL = mirrorMakerProperties.getProperty("topicURL"); this.topicname = mirrorMakerProperties.getProperty("topicname"); this.mechid = mirrorMakerProperties.getProperty("mechid"); + this.grepLog= mirrorMakerProperties.getProperty("grepLog"); BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); textEncryptor.setPassword(secret); - //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); - this.password = mirrorMakerProperties.getProperty("password"); + this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); } catch (IOException ex) { // ex.printStackTrace(); } finally {