X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fmessagerouter%2Fmirroragent.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fmr%2FdmaapMMAgent%2FMirrorMakerAgent.java;h=304a0678d4432cb60bb187b72058ce3574fda807;hp=becfbc5a4b51eb994ac13ea872094342e325d9c0;hb=62fa4be52b40c4587d2e53201ecf1f63bf424287;hpb=e5b378ca8f3c6667ab8b8bc6cfce6aead463f447 diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java index becfbc5..304a067 100644 --- a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java @@ -34,9 +34,9 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Properties; -//import org.json.JSONObject; -//import org.apache.log4j.Logger; -//import org.jasypt.util.text.BasicTextEncryptor; +import org.json.JSONObject; +import org.apache.log4j.Logger; +import org.jasypt.util.text.BasicTextEncryptor; import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker; import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker; @@ -48,20 +48,23 @@ import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.internal.LinkedTreeMap; + import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; -public class MirrorMakerAgent {/* +public class MirrorMakerAgent { static final Logger logger = Logger.getLogger(MirrorMakerAgent.class); Properties mirrorMakerProperties = new Properties(); ListMirrorMaker mirrorMakers = null; - String mmagenthome = ""; + String mmagenthome = "/opt"; String kafkahome = ""; String topicURL = ""; String topicname = ""; String mechid = ""; String password = ""; String grepLog = ""; + public boolean exitLoop = false; + TopicUtil topicUtil = new TopicUtil(); private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -84,7 +87,7 @@ public class MirrorMakerAgent {/* try { agent.checkAgentProcess(); } catch (Exception e) { - + e.printStackTrace(); } agent.readAgentTopic(); @@ -97,7 +100,7 @@ public class MirrorMakerAgent {/* private boolean checkStartup() { FileInputStream input = null; try { - this.mmagenthome = System.getProperty("MMAGENTHOME"); + //this.mmagenthome = System.getProperty("MMAGENTHOME"); input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); } catch (IOException ex) { @@ -108,7 +111,7 @@ public class MirrorMakerAgent {/* try { input.close(); } catch (IOException e) { - e.printStackTrace(); + logger.error("exception occured in checkStartup "+e); } } } @@ -125,18 +128,18 @@ public class MirrorMakerAgent {/* try { input.close(); } catch (IOException e) { - e.printStackTrace(); + logger.error("exception occured in checkStartup "+e); } } } - String response = publishTopic("{\"test\":\"test\"}"); + String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}"); if (response.startsWith("ERROR:")) { logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); return false; } logger.info("Published to Topic :" + this.topicname + " Successfully"); - response = subscribeTopic("1"); + response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response); if (response != null && response.startsWith("ERROR:")) { logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); @@ -160,20 +163,20 @@ public class MirrorMakerAgent {/* Properties prop = new Properties(); prop.load(input); if (propName.equals("consumer")) { - prop.setProperty("group.id", mm.name); - - prop.setProperty("bootstrap.servers", mm.consumer); - prop.setProperty("client.id", mm.name + "MM_consumer"); - } else { + prop.setProperty("group.id", mm.name); + + prop.setProperty("bootstrap.servers", mm.consumer); + prop.setProperty("client.id", mm.name + "MM_consumer"); + } else { prop.setProperty("bootstrap.servers", mm.producer); - prop.setProperty("client.id", mm.name + "MM_producer"); - + prop.setProperty("client.id", mm.name + "MM_producer"); + } out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties"); prop.store(out, ""); } catch (Exception e) { - e.printStackTrace(); + logger.error("exception occured in checkPropertiesFile "+e); } } finally { if (input != null) { @@ -187,7 +190,7 @@ public class MirrorMakerAgent {/* try { out.close(); } catch (IOException e) { - e.printStackTrace(); + logger.error("exception occured in checkPropertiesFile "+e); } } } @@ -199,15 +202,17 @@ public class MirrorMakerAgent {/* int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); for (int i = 0; i < mirrorMakersCount; i++) { MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); - if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name,mm.enablelogCheck,this.grepLog) == false) { + if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name, mm.enablelogCheck, + this.grepLog) == false) { checkPropertiesFile(mm, "consumer", false); checkPropertiesFile(mm, "producer", false); if (mm.whitelist != null && !mm.whitelist.equals("")) { - logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); + logger.info( + "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, mmagenthome + "/etc/" + mm.name + "consumer.properties", - mmagenthome + "/etc/" + mm.name + "producer.properties",mm.numStreams, mm.whitelist); + mmagenthome + "/etc/" + mm.name + "producer.properties", mm.numStreams, mm.whitelist); mm.setStatus("RESTARTING"); } else { @@ -230,142 +235,28 @@ public class MirrorMakerAgent {/* // System.out.println(g.toJson(mirrorMakers)); } - private String subscribeTopic(String timeout) { - String response = ""; - try { - String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout - + "&limit=1"; - String authString = this.mechid + ":" + this.password; - String authStringEnc = Base64.encode(authString.getBytes()); - URL url = new URL(requestURL); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("GET"); - connection.setDoOutput(true); - connection.setRequestProperty("Authorization", "Basic " + authStringEnc); - connection.setRequestProperty("Content-Type", "application/json"); - InputStream content = (InputStream) connection.getInputStream(); - BufferedReader in = new BufferedReader(new InputStreamReader(content)); - String line; - - while ((line = in.readLine()) != null) { - response = response + line; - } - Gson g = new Gson(); - //Get message as JSON Array - JsonArray topicMessage = g.fromJson(response, JsonArray.class); - if (topicMessage.size() != 0) { - return topicMessage.get(0).toString(); - } - - // get message as JSON String Array - String[] topicMessage = g.fromJson(response, String[].class); - if (topicMessage.length != 0) { - return topicMessage[0]; - } - } catch (Exception e) { - return "ERROR:" + e.getMessage() + " Server Response is:" + response; - } - return null; - } - - private String publishTopic(String message) { - try { - String requestURL = this.topicURL + "/events/" + this.topicname; - String authString = this.mechid + ":" + this.password; - String authStringEnc = Base64.encode(authString.getBytes()); - URL url = new URL(requestURL); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setDoOutput(true); - connection.setRequestProperty("Authorization", "Basic " + authStringEnc); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Content-Length", Integer.toString(message.length())); - DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); - wr.write(message.getBytes()); - - InputStream content = (InputStream) connection.getInputStream(); - BufferedReader in = new BufferedReader(new InputStreamReader(content)); - String line; - String response = ""; - while ((line = in.readLine()) != null) { - response = response + line; - } - return response; - - } catch (Exception e) { - return "ERROR:" + e.getLocalizedMessage(); - } - } - - private void readAgentTopic() { + public void readAgentTopic() { try { int connectionattempt = 0; while (true) { logger.info("--------------------------------"); logger.info("Waiting for Messages for 60 secs"); - String topicMessage = subscribeTopic("60000"); + String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password); Gson g = new Gson(); LinkedTreeMap object = null; if (topicMessage != null) { try { - //Check and parse if String object returned by consumer API - //else use the jsonObject - if( topicMessage.startsWith("\"")) - { - topicMessage = g.fromJson(topicMessage.toString(), String.class); - } + // Check and parse if String object returned by consumer + // API + // else use the jsonObject + if (topicMessage.startsWith("\"")) { + topicMessage = g.fromJson(topicMessage.toString(), String.class); + } object = g.fromJson(topicMessage, LinkedTreeMap.class); // Cast the 1st item (since limit=1 and see the type of // object - if (object.get("createMirrorMaker") != null) { - logger.info("Received createMirrorMaker request from topic"); - CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); - createMirrorMaker(m.getCreateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("updateMirrorMaker") != null) { - logger.info("Received updateMirrorMaker request from topic"); - UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); - JSONObject json = new JSONObject(topicMessage); - JSONObject json2 = (JSONObject) json.get("updateMirrorMaker"); - if(!json2.has("numStreams")){ - m.getUpdateMirrorMaker().setNumStreams(0); - } - updateMirrorMaker(m.getUpdateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("deleteMirrorMaker") != null) { - logger.info("Received deleteMirrorMaker request from topic"); - DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); - deleteMirrorMaker(m.getDeleteMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listAllMirrorMaker") != null) { - logger.info("Received listALLMirrorMaker request from topic"); - checkAgentProcess(); - mirrorMakers.setMessageID((String) object.get("messageID")); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("updateWhiteList") != null) { - logger.info("Received updateWhiteList request from topic"); - UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); - updateWhiteList(m.getUpdateWhiteList()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listMirrorMaker") != null) { - logger.info("Received listMirrorMaker from topic, skipping messages"); - } else { - logger.info("Received unknown request from topic"); - } + readAgent(object, topicMessage); } catch (Exception ex) { connectionattempt++; if (connectionattempt > 5) { @@ -381,6 +272,9 @@ public class MirrorMakerAgent {/* connectionattempt = 0; checkAgentProcess(); } + if (exitLoop) { + break; + } } } catch (Exception e) { @@ -389,7 +283,7 @@ public class MirrorMakerAgent {/* } - private void createMirrorMaker(MirrorMaker newMirrorMaker) { + public void createMirrorMaker(MirrorMaker newMirrorMaker) { boolean exists = false; if (mirrorMakers != null) { int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); @@ -412,8 +306,8 @@ public class MirrorMakerAgent {/* list.add(newMirrorMaker); mirrorMakers.setListMirrorMaker(list); } - checkPropertiesFile(newMirrorMaker, "consumer", true); - checkPropertiesFile(newMirrorMaker, "producer", true); + checkPropertiesFile(newMirrorMaker, "consumer", true); + checkPropertiesFile(newMirrorMaker, "producer", true); Gson g = new Gson(); mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); @@ -442,23 +336,20 @@ public class MirrorMakerAgent {/* MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); if (mm.name.equals(newMirrorMaker.name)) { exists = true; - if(null!=newMirrorMaker.getConsumer()) - { + if (null != newMirrorMaker.getConsumer()) { mm.setConsumer(newMirrorMaker.getConsumer()); } - if(null!=newMirrorMaker.getProducer()) - { + if (null != newMirrorMaker.getProducer()) { mm.setProducer(newMirrorMaker.getProducer()); } - if(newMirrorMaker.getNumStreams()>=1) - { + if (newMirrorMaker.getNumStreams() >= 1) { mm.setNumStreams(newMirrorMaker.getNumStreams()); - } - + } + mm.setEnablelogCheck(newMirrorMaker.enablelogCheck); - + mirrorMakers.getListMirrorMaker().set(i, mm); - newMirrorMaker=mm; + newMirrorMaker = mm; logger.info("Updating MirrorMaker:" + newMirrorMaker.name); } } @@ -608,12 +499,12 @@ public class MirrorMakerAgent {/* this.topicURL = mirrorMakerProperties.getProperty("topicURL"); this.topicname = mirrorMakerProperties.getProperty("topicname"); this.mechid = mirrorMakerProperties.getProperty("mechid"); - this.grepLog= mirrorMakerProperties.getProperty("grepLog"); + this.grepLog = mirrorMakerProperties.getProperty("grepLog"); BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); textEncryptor.setPassword(secret); this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); - } catch (IOException ex) { + } catch (Exception ex) { // ex.printStackTrace(); } finally { if (input != null) { @@ -626,4 +517,59 @@ public class MirrorMakerAgent {/* } } -*/} + + public void readAgent(LinkedTreeMap object, String topicMessage) throws Exception{ + + Gson g = new Gson(); + + if (object.get("createMirrorMaker") != null) { + logger.info("Received createMirrorMaker request from topic"); + CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); + createMirrorMaker(m.getCreateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateMirrorMaker") != null) { + logger.info("Received updateMirrorMaker request from topic"); + UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + JSONObject json = new JSONObject(topicMessage); + JSONObject json2 = (JSONObject) json.get("updateMirrorMaker"); + if (!json2.has("numStreams")) { + m.getUpdateMirrorMaker().setNumStreams(0); + } + updateMirrorMaker(m.getUpdateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("deleteMirrorMaker") != null) { + logger.info("Received deleteMirrorMaker request from topic"); + DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); + deleteMirrorMaker(m.getDeleteMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listAllMirrorMaker") != null) { + logger.info("Received listALLMirrorMaker request from topic"); + checkAgentProcess(); + mirrorMakers.setMessageID((String) object.get("messageID")); + topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateWhiteList") != null) { + logger.info("Received updateWhiteList request from topic"); + UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); + updateWhiteList(m.getUpdateWhiteList()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + topicUtil.publishTopic(topicURL, topicname, mechid, password, g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listMirrorMaker") != null) { + logger.info("Received listMirrorMaker from topic, skipping messages"); + } else { + logger.info("Received unknown request from topic"); + } + + } +}