Optimize collector publish mechanism 21/28821/1
authorvagrant <vv770d@att.com>
Sat, 20 Jan 2018 14:55:43 +0000 (14:55 +0000)
committervagrant <vv770d@att.com>
Sat, 20 Jan 2018 14:59:57 +0000 (14:59 +0000)
Change-Id: I6d5fb2ad45257831e8e0ff62812df1fd1401464d
Issue-ID: DCAEGEN2-281
Signed-off-by: Vijay VK <vv770d@att.com>
dpo/spec/vescollector-componentspec.json [changed mode: 0755->0644]
etc/eventTransform.json
src/main/java/org/onap/dcae/commonFunction/CommonStartup.java
src/main/java/org/onap/dcae/commonFunction/ConfigProcessors.java
src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java [new file with mode: 0644]
src/test/java/org/onap/dcae/vestest/TestEventProcessor.java

old mode 100755 (executable)
new mode 100644 (file)
index fd99086..1973e81
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_fault"
+                               "config_key": "ves-fault"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_measurement"
+                               "config_key": "ves-measurement"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_syslog"
+                               "config_key": "ves-syslog"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_heartbeat"
+                               "config_key": "ves-heartbeat"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_other"
+                               "config_key": "ves-other"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_mobileflow"
+                               "config_key": "ves-mobileflow"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_statechange"
+                               "config_key": "ves-statechange"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_thresholdCrossingAlert"
+                               "config_key": "ves-thresholdCrossingAlert"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_voicequality"
+                               "config_key": "ves-voicequality"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_sipsignaling"
+                               "config_key": "ves-sipsignaling"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_fault_secondary"
+                               "config_key": "ves-fault-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_measurement_secondary"
+                               "config_key": "ves-measurement-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_syslog_secondary"
+                               "config_key": "ves-syslog-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_heartbeat_secondary"
+                               "config_key": "ves-heartbeat-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_other_secondary"
+                               "config_key": "ves-other-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_mobileflow_secondary"
+                               "config_key": "ves-mobileflow-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_statechange_secondary"
+                               "config_key": "ves-statechange-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_thresholdCrossingAlert_secondary"
+                               "config_key": "ves-thresholdCrossingAlert-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_voicequality_secondary"
+                               "config_key": "ves-voicequality-secondary"
                        },
                        {
                                "format": "VES_specification",
                                "version": "5.28.4",
                                "type": "message router",
-                               "config_key": "ves_sipsignaling_secondary"
+                               "config_key": "ves-sipsignaling-secondary"
                        }
                ]
        },
                },
                {
                        "name": "collector.dmaap.streamid",
-                       "value": "fault=ves_fault,ves_fault_secondary|syslog=ves_syslog,ves_syslog_secondary|heartbeat=ves_heartbeat,ves_heartbeat_secondary|measurementsForVfScaling=ves_measurement,ves_measurement_secondary|mobileFlow=ves_mobileflow,ves_mobileflow_secondary|other=ves_other,ves_other_secondary|stateChange=ves_statechange,ves_statechange_secondary|thresholdCrossingAlert=ves_thresholdCrossingAlert,ves_thresholdCrossingAlert_secondary|voiceQuality=ves_voicequality,ves_voicequality_secondary|sipSignaling=ves_sipsignaling,ves_sipsignaling_secondary",
+                       "value": "fault=ves-fault,ves-fault-secondary|syslog=ves-syslog,ves-syslog-secondary|heartbeat=ves-heartbeat,ves-heartbeat-secondary|measurementsForVfScaling=ves-measurement,ves-measurement-secondary|mobileFlow=ves-mobileflow,ves-mobileflow-secondary|other=ves-other,ves-other-secondary|stateChange=ves-statechange,ves-statechange-secondary|thresholdCrossingAlert=ves-thresholdCrossingAlert,ves-thresholdCrossingAlert-secondary|voiceQuality=ves-voicequality,ves-voicequality-secondary|sipSignaling=ves-sipsignaling,ves-sipsignaling-secondary",
                        "description": "domain-streamid mapping"
                },
                {
                },
                {
                        "name": "header.authlist",
-                       "value": "userid1,base64encodepwd1|userid2,base64encodepwd2",
-                       "description": "List of id and base64 encoded pwd"
+                       "value": "sample1,c2FtcGxlMQ==|userid1,base64encodepwd1|userid2,base64encodepwd2",
+                       "description": "List of id and base64 encoded pwd",
+                       "sourced_at_deployment":true
                },
                {
                        "name": "collector.schema.checkflag",
        ],
        "auxilary": {
                "healthcheck": {
-                       "type": "http",
+                       "type": "https",
                        "interval": "15s",
                        "timeout": "1s",
                        "endpoint": "/healthcheck"
                },
+               "volumes": [
+                       {
+                               "container": {
+                                       "bind": "/opt/app/dcae-certificate"
+                               },
+                               "host": {
+                                       "path": "/opt/app/dcae-certificate"
+                               }
+                       },
+                       {
+                               "container": {
+                                       "bind": "/opt/app/VESCollector/logs"
+                               },
+                               "host": {
+                                       "path": "/opt/data/DCAE/VESCollector/logs"
+                               }
+                       }
+               ],
                "ports": [
                        "8443:8443"
                ]
        "artifacts": [
                {
                        "type": "docker image",
-                       "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.1"
+                       "uri": "nexus.onap.org:10001/onap/org.onap.dcaegen2.collectors.ves.vescollector:1.1"
+
+
                }
        ]
 }
index 26e059b..0437d04 100644 (file)
                                "delimiter":"_"
                        }
                },
+               {
+                       "functionName": "addAttribute",
+                       "args":{
+                               "field": "event.faultFields.faultFieldsVersion",
+                               "value": "2.0",
+                               "fieldType": "number"
+                       }
+               },
+               {
+                       "functionName": "addAttribute",
+                       "args":{
+                               "field": "event.commonEventHeader.version",
+                               "value": "3.0",
+                               "fieldType": "number"
+                       }
+               },
                {
                        "functionName": "map",
                        "args":{
        }
        
 ]
+
index b4fae2a..d1dbca9 100644 (file)
@@ -195,7 +195,10 @@ public class CommonStartup extends NsaBaseEndpoint implements Runnable {
                        // Thread epThread=new Thread(ep);
                        // epThread.start();
                        executor = Executors.newFixedThreadPool(20);
-                       executor.execute(ep);
+                       //executor.execute(ep);
+                       for (int i = 0; i < 20; ++i) {
+                               executor.execute(ep);
+                               }
 
                } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
                                | InterruptedException e) {
index 7e42bb5..4fb5adf 100644 (file)
@@ -41,9 +41,7 @@ public class ConfigProcessors {
                event = eventJson;
        }
 
-       /**
-        * 
-        */
+
        public void getValue(JSONObject J) {
                // log.info("addAttribute");
                final String field = J.getString(FIELD);
@@ -57,9 +55,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void setValue(JSONObject J) {
                // log.info("addAttribute");
                final String field = J.getString(FIELD);
@@ -73,9 +69,8 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
+
        public String evaluate(String str) {
                String value = str;
                if (str.startsWith("$")) {
@@ -85,9 +80,7 @@ public class ConfigProcessors {
                return value;
        }
 
-       /**
-        * { "functionName":"suppressEvent", "args":{} }
-        */
+
        public void suppressEvent(JSONObject J) {
                // log.info("addAttribute");
                final JSONObject filter = J.optJSONObject(FILTER);
@@ -100,9 +93,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void addAttribute(JSONObject J) {
                // log.info("addAttribute begin");
                final String field = J.getString(FIELD);
@@ -119,9 +110,7 @@ public class ConfigProcessors {
                // log.info("addAttribute End");
        }
 
-       /**
-        * 
-        */
+
        public void updateAttribute(JSONObject J) {
                // log.info("updateAttribute");
                final String field = J.getString(FIELD);
@@ -135,9 +124,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void removeAttribute(JSONObject J) {
                // log.info("removeAttribute");
                final String field = J.getString(FIELD);
@@ -149,9 +136,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void renameArrayInArray(JSONObject J) // map
        {
                log.info("renameArrayInArray");
@@ -185,9 +170,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void map(JSONObject J) {
                // log.info("mapAttribute");
                final String field = J.getString(FIELD);
@@ -200,9 +183,6 @@ public class ConfigProcessors {
                        mapAttribute(J);
        }
 
-       /**
-        * 
-        */
        public String performOperation(String operation, String value) {
                log.info("performOperation");
                if (operation != null) {
@@ -214,9 +194,7 @@ public class ConfigProcessors {
                return value;
        }
 
-       /**
-        * 
-        */
+
        // public void mapAttributeToArrayAttribute(JSONObject J)
        public void mapAttribute(JSONObject J) {
                // log.info("mapAttribute");
@@ -241,9 +219,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        public void mapToJArray(JSONObject J) {
                log.info("mapToJArray");
                String field = J.getString(FIELD);
@@ -355,9 +331,7 @@ public class ConfigProcessors {
                        log.info("Filter not met");
        }
 
-       /**
-        * 
-        */
+
        private void removeEventKey(String field) {
                String[] keySet = field.split("\\.", field.length());
                JSONObject keySeries = event;
@@ -371,9 +345,7 @@ public class ConfigProcessors {
 
        }
 
-       /**
-        * 
-        */
+
        private boolean checkFilter(JSONObject jo, String key, String logicKey) {
                String filterValue = jo.getString(key);
                boolean retVal = true;
@@ -431,9 +403,7 @@ public class ConfigProcessors {
                return retVal;
        }
 
-       /**
-        * 
-        */
+
        public boolean isFilterMet(JSONObject jo) {
                boolean retval = true;
                // log.info("Filter==" + jo.toString());
@@ -579,3 +549,4 @@ public class ConfigProcessors {
 
        private JSONObject event = new JSONObject();
 }
+
index 79dea79..6811c67 100644 (file)
@@ -70,7 +70,7 @@ public class EventProcessor implements Runnable {
                try {
 
                        event = CommonStartup.fProcessingInputQueue.take();
-                       log.info("EventProcessor\tRemoving element: " + event);
+                       log.info("QueueSize:" + CommonStartup.fProcessingInputQueue.size()+  "\tEventProcessor\tRemoving element: " + event );
 
                        // EventPublisher Ep=new EventPublisher();
                        while (event != null) {
@@ -95,7 +95,8 @@ public class EventProcessor implements Runnable {
                                        for (String aStreamIdList : streamIdList) {
                                                log.info("Invoking publisher for streamId:" + aStreamIdList);
                                                this.overrideEvent();
-                                               EventPublisher.getInstance(aStreamIdList).sendEvent(event);
+                                               //EventPublisher.getInstance(aStreamIdList).sendEvent(event);
+                                               EventPublisherHash.getInstance().sendEvent(event, aStreamIdList);
 
                                        }
                                }
diff --git a/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java b/src/main/java/org/onap/dcae/commonFunction/EventPublisherHash.java
new file mode 100644 (file)
index 0000000..f8bdcaa
--- /dev/null
@@ -0,0 +1,179 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.commonFunction;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.clock.SaClock;
+import com.att.nsa.logging.LoggingContext;
+import com.att.nsa.logging.log4j.EcompFields;
+import com.google.gson.JsonArray;
+
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class EventPublisherHash {
+
+       private static final String VES_UNIQUE_ID = "VESuniqueId";
+       private static EventPublisherHash instance;
+       private  CambriaBatchingPublisher pub;
+
+       private String streamid = "";
+       private String ueburl = "";
+       private String topic = "";
+       private String authuser = "";
+       private String authpwd = "";
+
+       private static Logger log = LoggerFactory.getLogger(EventPublisherHash.class);
+
+       protected static HashMap<String, CambriaBatchingPublisher> map = new HashMap<String, CambriaBatchingPublisher>();
+       
+
+
+       public CambriaBatchingPublisher Dmaaphash(String newstreamid) {
+               pub = null;
+               streamid = newstreamid;
+               if (map != null && map.containsKey(streamid)){
+                       pub =  map.get(streamid);
+                       
+               }
+               else
+               {
+               
+               try {
+                       ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+                                       .get(streamid + ".cambria.url");
+
+                       if (ueburl == null) {
+                               ueburl = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+                                               .get(streamid + ".cambria.hosts");
+                       }
+                       topic = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+                                       .getKeyValue(streamid + ".cambria.topic");
+                       authuser = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile)
+                                       .getKeyValue(streamid + ".basicAuthUsername");
+
+                       if (authuser != null) {
+                               authpwd = DmaapPropertyReader.getInstance(CommonStartup.cambriaConfigFile).dmaap_hash
+                                               .get(streamid + ".basicAuthPassword");
+                       }
+                       
+                       
+                       if (authuser != null) {
+                               log.debug(String.format("URL:%sTOPIC:%sAuthUser:%sAuthpwd:%s", ueburl, topic, authuser, authpwd));
+                               pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic).usingHttps()
+                                               .authenticatedByHttp(authuser, authpwd).logSendFailuresAfter(5)
+                                               // .logTo(log)
+                                               // .limitBatch(100, 10)
+                                               .build();
+                       } else {
+
+                               log.debug(String.format("URL:%sTOPIC:%s", ueburl, topic));
+                               pub = new CambriaClientBuilders.PublisherBuilder().usingHosts(ueburl).onTopic(topic)
+                                               // .logTo(log)
+                                               .logSendFailuresAfter(5)
+                                               // .limitBatch(100, 10)
+                                               .build();
+
+                       }
+                       
+                       map.put(streamid, pub);
+                       
+
+               }  catch ( Exception e) {
+                       log.error("CambriaClientBuilders connection reader exception : streamID - " + newstreamid + " " + e.getMessage());
+               } 
+
+               }
+               return pub;
+       
+       }
+
+       /**
+        * Returns event publisher
+        * @return event publisher
+        */
+       public static synchronized EventPublisherHash getInstance() {
+               if (instance == null) {
+                       instance = new EventPublisherHash();
+               }
+               return instance;
+
+       }
+
+
+       public synchronized void sendEvent(JSONObject event, String streamid) {
+
+               log.debug("EventPublisher.sendEvent: instance for publish is ready");
+
+               if (event.has(VES_UNIQUE_ID)) {
+                       String uuid = event.get(VES_UNIQUE_ID).toString();
+                       LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
+                       localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
+                       log.debug("Removing VESuniqueid object from event");
+                       event.remove(VES_UNIQUE_ID);
+               }
+
+               try {
+                       
+                       
+                       int pendingMsgs = Dmaaphash(streamid).send("MyPartitionKey", event.toString());
+                       // this.wait(2000);
+
+                       if (pendingMsgs > 100) {
+                               log.info("Pending Message Count=" + pendingMsgs);
+                       }
+
+                       log.info("pub.send invoked - no error");
+                       //CommonStartup.oplog.info(String.format("URL:%sTOPIC:%sEvent Published:%s", ueburl, topic, event));
+                       CommonStartup.oplog.info(String.format("StreamID:%s Event Published:%s ", streamid, event));
+               } catch (IOException  | IllegalArgumentException e) {
+                       log.error("Unable to publish event: {} streamid: {}. Exception: {}", event, streamid, e);
+                       Dmaaphash(streamid).close();
+                       map.remove(streamid);
+               } 
+
+       }
+
+       public synchronized void closePublisher() {
+
+               try {
+                       if (pub != null) {
+                               
+                               final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
+                               if (!stuck.isEmpty()) {
+                                       log.error(stuck.size() + " messages unsent");
+                               }
+                       }
+               } catch (InterruptedException | IOException e) {
+                       log.error("Caught Exception on Close event: {}", e);
+               }
+
+       }
+}
+
index 31807db..f11cc90 100644 (file)
@@ -36,10 +36,11 @@ import org.junit.Test;
 import org.onap.dcae.commonFunction.CommonStartup;
 import org.onap.dcae.commonFunction.EventProcessor;
 import org.onap.dcae.commonFunction.EventPublisher;
+import org.onap.dcae.commonFunction.EventPublisherHash;
 import org.onap.dcae.controller.LoadDynamicConfig;
 import org.onap.dcae.commonFunction.DmaapPropertyReader;
 
-
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
 import com.google.gson.JsonParser;
 
 public class TestEventProcessor {
@@ -99,6 +100,74 @@ public class TestEventProcessor {
                assertEquals(true, flag);
        }
        
+       
+       @Test
+       public void testpublisherhashclass() {
+
+           DmaapPropertyReader dr;
+           EventPublisherHash eph = null;
+           Boolean flag = false;
+               eph = EventPublisherHash.getInstance();
+               
+               
+               if (eph.equals(null))
+               {
+                       flag = false;
+               }
+               else
+               {
+                       flag = true;
+               }
+               assertEquals(true, flag);
+               
+               
+       }
+       
+       @Test
+       public void testpublisherhashclassload() {
 
+           DmaapPropertyReader dr;
+           EventPublisherHash eph = null;
+           String testinput = "src/test/resources/testDmaapConfig.json";
+           Boolean flag = false;
+           dr = new DmaapPropertyReader(testinput);
+               eph = EventPublisherHash.getInstance();
+               EventProcessor ec = new EventProcessor();
+               ec.event=new org.json.JSONObject(ev);   
+               CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig.json";
+               CambriaBatchingPublisher pub = eph.Dmaaphash("sec_fault_ueb");
+               
+               if (pub == null || pub.equals(null))
+               {
+                       flag = false;
+               }
+               else
+               {
+                       flag = true;
+               }
+               assertEquals(true, flag);
+               
+       }               
+
+       @Test
+       public void testpublisherhashSend() {
+
+           DmaapPropertyReader dr;
+           EventPublisherHash eph = null;
+           String testinput = "src/test/resources/testDmaapConfig.json";
+           Boolean flag = true;
+           dr = new DmaapPropertyReader(testinput);
+               eph = EventPublisherHash.getInstance();
+
+               
+               EventProcessor ec = new EventProcessor();
+               ec.event=new org.json.JSONObject(ev);
+               CommonStartup.cambriaConfigFile="src/test/resources/testDmaapConfig.json";
+               eph.sendEvent(ec.event, "sec_fault_ueb");
+               
+               assertEquals(true, flag);
+               
+       }
 }
 
+