X-Git-Url: https://gerrit.onap.org/r/gitweb?p=dmaap%2Fmessagerouter%2Fmirroragent.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2FdmaapMMAgent%2FMirrorMakerAgent.java;fp=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2FdmaapMMAgent%2FMirrorMakerAgent.java;h=71bd85cdea9bfd7ba60e576d59ea887f8b51ef72;hp=3f993db9f926edce12bdd7e7de6f763986957710;hb=669941433fe0d154e11161e0a1095518a6199b06;hpb=09aefa740b806bd6b9fff9569b8a8cef7fe822a3 diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java index 3f993db..71bd85c 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -22,12 +22,17 @@ package com.att.nsa.dmaapMMAgent; +import java.io.BufferedReader; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; import java.util.ArrayList; import java.util.Properties; @@ -44,8 +49,10 @@ import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList; import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler; import com.google.gson.Gson; import com.google.gson.internal.LinkedTreeMap; +import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; +import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; -public class MirrorMakerAgent { +public class MirrorMakerAgent {/* static final Logger logger = Logger.getLogger(MirrorMakerAgent.class); Properties mirrorMakerProperties = new Properties(); ListMirrorMaker mirrorMakers = null; @@ -55,8 +62,6 @@ public class MirrorMakerAgent { String topicname = ""; String mechid = ""; String password = ""; - TopicUtil topicUtil = new TopicUtil(); - public boolean exitLoop = false; private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -91,8 +96,7 @@ public class MirrorMakerAgent { input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); } catch (IOException ex) { - logger.error( - mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); + logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); return false; } finally { if (input != null) { @@ -106,11 +110,8 @@ public class MirrorMakerAgent { loadProperties(); input = null; try { - /* - * input = new FileInputStream(kafkahome + - * "/bin/kafka-run-class.sh"); - */ - if (false) { + input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh"); + if(false) { throw new IOException(); } logger.info("kakahome is set :" + kafkahome); @@ -126,14 +127,14 @@ public class MirrorMakerAgent { } } } - String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}"); + String response = publishTopic("{\"test\":\"test\"}"); if (response.startsWith("ERROR:")) { logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); return false; } logger.info("Published to Topic :" + this.topicname + " Successfully"); - response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response); + response = subscribeTopic("1"); if (response != null && response.startsWith("ERROR:")) { logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); @@ -167,14 +168,14 @@ public class MirrorMakerAgent { prop.store(out, ""); } catch (Exception e) { - logger.error("Exception at checkPropertiesFile " + e); + logger.error("Exception at checkPropertiesFile " +e); } } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("Exception occurred is " + e); + logger.error("Exception occurred is " +e); } } if (out != null) { @@ -182,7 +183,7 @@ public class MirrorMakerAgent { out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("Exception is : " + e); + logger.error("Exception is : "+e); } } } @@ -199,8 +200,7 @@ public class MirrorMakerAgent { checkPropertiesFile(mm.name, "producer", mm.producer, false); if (mm.whitelist != null && !mm.whitelist.equals("")) { - logger.info( - "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); + logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, mmagenthome + "/etc/" + mm.name + "consumer.properties", mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist); @@ -213,7 +213,7 @@ public class MirrorMakerAgent { try { Thread.sleep(1000); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + Thread.currentThread().interrupt(); } mirrorMakers.getListMirrorMaker().set(i, mm); } else { @@ -227,13 +227,76 @@ public class MirrorMakerAgent { // System.out.println(g.toJson(mirrorMakers)); } - public void readAgentTopic() { + private String subscribeTopic(String timeout) { + String response = ""; + try { + String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout + + "&limit=1"; + String authString = this.mechid + ":" + this.password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + + while ((line = in.readLine()) != null) { + response = response + line; + } + Gson g = new Gson(); + // get message as JSON String Array + String[] topicMessage = g.fromJson(response, String[].class); + if (topicMessage.length != 0) { + return topicMessage[0]; + } + } catch (Exception e) { + logger.error(" Exception Occered " + e); + return "ERROR:" + e.getMessage() + " Server Response is:" + response; + } + return null; + } + + private String publishTopic(String message) { + try { + String requestURL = this.topicURL + "/events/" + this.topicname; + String authString = this.mechid + ":" + this.password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Content-Length", Integer.toString(message.length())); + DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); + wr.write(message.getBytes()); + + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + String response = ""; + while ((line = in.readLine()) != null) { + response = response + line; + } + return response; + + } catch (Exception e) { + logger.error(" Exception Occered " + e); + return "ERROR:" + e.getLocalizedMessage(); + } + } + + private void readAgentTopic() { try { int connectionattempt = 0; while (true) { logger.info("--------------------------------"); logger.info("Waiting for Messages for 60 secs"); - String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password); + String topicMessage = subscribeTopic("60000"); Gson g = new Gson(); LinkedTreeMap object = null; if (topicMessage != null) { @@ -242,7 +305,49 @@ public class MirrorMakerAgent { // Cast the 1st item (since limit=1 and see the type of // object - readAgent(object, topicMessage); + if (object.get("createMirrorMaker") != null) { + logger.info("Received createMirrorMaker request from topic"); + CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); + createMirrorMaker(m.getCreateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateMirrorMaker") != null) { + logger.info("Received updateMirrorMaker request from topic"); + UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + updateMirrorMaker(m.getUpdateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("deleteMirrorMaker") != null) { + logger.info("Received deleteMirrorMaker request from topic"); + DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); + deleteMirrorMaker(m.getDeleteMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listAllMirrorMaker") != null) { + logger.info("Received listALLMirrorMaker request from topic"); + checkAgentProcess(); + mirrorMakers.setMessageID((String) object.get("messageID")); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateWhiteList") != null) { + logger.info("Received updateWhiteList request from topic"); + UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); + updateWhiteList(m.getUpdateWhiteList()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listMirrorMaker") != null) { + logger.info("Received listMirrorMaker from topic, skipping messages"); + } else { + logger.info("Received unknown request from topic"); + } } catch (Exception ex) { connectionattempt++; if (connectionattempt > 5) { @@ -258,9 +363,7 @@ public class MirrorMakerAgent { connectionattempt = 0; checkAgentProcess(); } - if (exitLoop) { - break; - } + } } catch (Exception e) { logger.error("Exception at readAgentTopic : " + e); @@ -268,55 +371,6 @@ public class MirrorMakerAgent { } - public void readAgent(LinkedTreeMap object, String topicMessage) { - - Gson g = new Gson(); - - if (object.get("createMirrorMaker") != null) { - logger.info("Received createMirrorMaker request from topic"); - CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); - createMirrorMaker(m.getCreateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("updateMirrorMaker") != null) { - logger.info("Received updateMirrorMaker request from topic"); - UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); - updateMirrorMaker(m.getUpdateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("deleteMirrorMaker") != null) { - logger.info("Received deleteMirrorMaker request from topic"); - DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); - deleteMirrorMaker(m.getDeleteMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listAllMirrorMaker") != null) { - logger.info("Received listALLMirrorMaker request from topic"); - checkAgentProcess(); - mirrorMakers.setMessageID((String) object.get("messageID")); - topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); - } else if (object.get("updateWhiteList") != null) { - logger.info("Received updateWhiteList request from topic"); - UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); - updateWhiteList(m.getUpdateWhiteList()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listMirrorMaker") != null) { - logger.info("Received listMirrorMaker from topic, skipping messages"); - } else { - logger.info("Received unknown request from topic"); - } - - } - protected void createMirrorMaker(MirrorMaker newMirrorMaker) { boolean exists = false; if (mirrorMakers != null) { @@ -530,8 +584,7 @@ public class MirrorMakerAgent { BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); textEncryptor.setPassword(secret); - // this.password = - // textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); + //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); this.password = mirrorMakerProperties.getProperty("password"); } catch (IOException ex) { // ex.printStackTrace(); @@ -546,4 +599,4 @@ public class MirrorMakerAgent { } } -} +*/}