</parent>
<groupId>org.onap.dcaegen2.collectors.ves</groupId>
<artifactId>VESCollector</artifactId>
- <version>1.3.1-SNAPSHOT</version>
+ <version>1.3.2-SNAPSHOT</version>
<name>dcaegen2-collectors-ves</name>
<description>VESCollector</description>
<properties>
private static final String COMMON_EVENT_HEADER = "commonEventHeader";\r
private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");\r
\r
- public JSONObject event;\r
private EventPublisher eventPublisher;\r
private Map<String, String[]> streamidHash;\r
private ApplicationSettings properties;\r
public void run() {\r
try {\r
while (true) {\r
- event = VesApplication.fProcessingInputQueue.take();\r
+ JSONObject event = VesApplication.fProcessingInputQueue.take();\r
// As long as the producer is running we remove elements from\r
// the queue.\r
log.info("QueueSize:" + VesApplication.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + event);\r
.onEmpty(() -> {\r
log.error("No StreamID defined for publish - Message dropped" + event);\r
}).forEach(streamIds -> {\r
- sendEventsToStreams(streamIds);\r
+ sendEventsToStreams(event, streamIds);\r
});\r
log.debug("Message published" + event);\r
}\r
}\r
}\r
\r
- public void overrideEvent() {\r
+ public void overrideEvent(JSONObject event) {\r
// Set collector timestamp in event payload before publish\r
addCurrentTimeToEvent(event);\r
\r
log.debug("Modified event:" + event);\r
}\r
\r
- private void sendEventsToStreams(String[] streamIdList) {\r
+ private void sendEventsToStreams(JSONObject event, String[] streamIdList) {\r
for (String aStreamIdList : streamIdList) {\r
log.info("Invoking publisher for streamId:" + aStreamIdList);\r
- this.overrideEvent();\r
+ overrideEvent(event);\r
eventPublisher.sendEvent(event, aStreamIdList);\r
}\r
}\r
properties = new ApplicationSettings(new String[]{}, CLIUtils::processCmdLine);
}
- @Test
- public void testLoad() {
- //given
- EventProcessor ec = new EventProcessor(mock(EventPublisher.class), properties);
- ec.event = new org.json.JSONObject(ev);
- //when
- ec.overrideEvent();
-
- //then
- Boolean hasSourceNameNode = ec.event.getJSONObject("event").getJSONObject("commonEventHeader").has("sourceName");
- assertTrue(hasSourceNameNode);
- }
-
@Test
public void shouldParseJsonEvents() throws ReflectiveOperationException {
//given