Fix bug with processing event list 54/75154/2 casablanca 1.3.2 3.0.1-ONAP
authorZlatko Murgoski <zlatko.murgoski@nokia.com>
Mon, 31 Dec 2018 10:29:52 +0000 (11:29 +0100)
committerZlatko Murgoski <zlatko.murgoski@nokia.com>
Wed, 2 Jan 2019 15:11:51 +0000 (16:11 +0100)
DMaapEventPublisher processing of eventList

Change-Id: I3aa31dd55aefdc5130fcc36712f20bd15c3f7cce
Issue-ID: DCAEGEN2-1035
Signed-off-by: Zlatko Murgoski <zlatko.murgoski@nokia.com>
pom.xml
src/main/java/org/onap/dcae/commonFunction/EventProcessor.java
src/test/java/org/onap/dcae/commonFunction/EventProcessorTest.java
version.properties

diff --git a/pom.xml b/pom.xml
index d350396..81e409a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ limitations under the License.
     </parent>
     <groupId>org.onap.dcaegen2.collectors.ves</groupId>
     <artifactId>VESCollector</artifactId>
-    <version>1.3.1-SNAPSHOT</version>
+    <version>1.3.2-SNAPSHOT</version>
     <name>dcaegen2-collectors-ves</name>
     <description>VESCollector</description>
     <properties>
index 7d27399..0d150c5 100644 (file)
@@ -50,7 +50,6 @@ public class EventProcessor implements Runnable {
     private static final String COMMON_EVENT_HEADER = "commonEventHeader";\r
     private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");\r
 \r
-    public JSONObject event;\r
     private EventPublisher eventPublisher;\r
     private Map<String, String[]> streamidHash;\r
     private ApplicationSettings properties;\r
@@ -66,7 +65,7 @@ public class EventProcessor implements Runnable {
     public void run() {\r
         try {\r
             while (true) {\r
-                event = VesApplication.fProcessingInputQueue.take();\r
+                JSONObject event = VesApplication.fProcessingInputQueue.take();\r
                 // As long as the producer is running we remove elements from\r
                 // the queue.\r
                 log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);\r
@@ -81,7 +80,7 @@ public class EventProcessor implements Runnable {
                         .onEmpty(() -> {\r
                             log.error("No StreamID defined for publish - Message dropped" + event);\r
                         }).forEach(streamIds -> {\r
-                    sendEventsToStreams(streamIds);\r
+                    sendEventsToStreams(event, streamIds);\r
                 });\r
                 log.debug("Message published" + event);\r
             }\r
@@ -91,7 +90,7 @@ public class EventProcessor implements Runnable {
         }\r
     }\r
 \r
-    public void overrideEvent() {\r
+    public void overrideEvent(JSONObject event) {\r
         // Set collector timestamp in event payload before publish\r
         addCurrentTimeToEvent(event);\r
 \r
@@ -112,10 +111,10 @@ public class EventProcessor implements Runnable {
         log.debug("Modified event:" + event);\r
     }\r
 \r
-    private void sendEventsToStreams(String[] streamIdList) {\r
+    private void sendEventsToStreams(JSONObject event, String[] streamIdList) {\r
         for (String aStreamIdList : streamIdList) {\r
             log.info("Invoking publisher for streamId:" + aStreamIdList);\r
-            this.overrideEvent();\r
+            overrideEvent(event);\r
             eventPublisher.sendEvent(event, aStreamIdList);\r
         }\r
     }\r
index 697510c..5641a92 100644 (file)
@@ -62,19 +62,6 @@ public class EventProcessorTest {
         properties = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine);
     }
 
-    @Test
-    public void testLoad() {
-        //given
-        EventProcessor ec = new EventProcessor(mock(EventPublisher.class), properties);
-        ec.event = new org.json.JSONObject(ev);
-        //when
-        ec.overrideEvent();
-
-        //then
-        Boolean hasSourceNameNode = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
-        assertTrue(hasSourceNameNode);
-    }
-
     @Test
     public void shouldParseJsonEvents() throws ReflectiveOperationException {
         //given
index fee4928..ef20baa 100644 (file)
@@ -1,6 +1,6 @@
 major=1
 minor=3
-patch=1
+patch=2
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT