Dynamic partition-key support 71/110171/1
authorVijay Venkatesh Kumar <vv770d@att.com>
Tue, 14 Jul 2020 18:34:45 +0000 (18:34 +0000)
committerVijay Venkatesh Kumar <vv770d@att.com>
Tue, 14 Jul 2020 18:35:19 +0000 (18:35 +0000)
Change-Id: I6c5faf4a610813e1382da55f62b5a0f9da6db506
Signed-off-by: Vijay Venkatesh Kumar <vv770d@att.com>
Issue-ID: DCAEGEN2-1484

Changelog.md
pom.xml
src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
version.properties

index a87250e..02c7261 100644 (file)
@@ -16,3 +16,5 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
            - WebMvcConfig 
 ## [1.7.0] - 09/07/2020
         - [DCAEGEN2-2254](https://jira.onap.org/browse/DCAEGEN2-2254) - Update schema to CommonEventFormat_30.2_ONAP in the eventListerner/v7 interface
+## [1.7.1] - 13/07/2020
+        - [DCAEGEN2-1484](https://jira.onap.org/browse/DCAEGEN2-1484) - VESCollector DMaap publish optimization
diff --git a/pom.xml b/pom.xml
index c2ded80..dcf2628 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
   </parent>\r
   <groupId>org.onap.dcaegen2.collectors.ves</groupId>\r
   <artifactId>VESCollector</artifactId>\r
-  <version>1.7.0-SNAPSHOT</version>\r
+  <version>1.7.1-SNAPSHOT</version>\r
   <name>dcaegen2-collectors-ves</name>\r
   <description>VESCollector</description>\r
   <properties>\r
index b00b274..3fc9e25 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
  * Copyright (C) 2018 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,6 +42,9 @@ import static org.onap.dcae.common.publishing.VavrUtils.f;
 class DMaaPEventPublisher implements EventPublisher {
     private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
     private static final String VES_UNIQUE_ID = "VESuniqueId";
+    private static final String EVENT = "event";
+    private static final String COMMON_EVENT_HEADER = "commonEventHeader";
+    private static final String PARTITION_KEY = "sourceName";
     private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
     private final DMaaPPublishersCache publishersCache;
     private final Logger outputLogger;
@@ -73,7 +76,9 @@ class DMaaPEventPublisher implements EventPublisher {
 
     private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
             throws IOException {
-        int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+       
+       String pk = event.getJSONObject(EVENT).getJSONObject(COMMON_EVENT_HEADER).get(PARTITION_KEY).toString();
+        int pendingMsgs = publisher.send(pk, event.toString());
         if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
             log.info("Pending messages count: " + pendingMsgs);
         }
index 809ac99..45cdf28 100644 (file)
@@ -3,6 +3,7 @@
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
  * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2020 AT&T. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -52,26 +53,28 @@ public class DMaaPEventPublisherTest {
     @Test
     public void shouldSendEventToTopic() throws Exception {
         // given
-        JSONObject event = new JSONObject("{}");
+        JSONObject event = new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
+
 
         // when
         eventPublisher.sendEvent(event, STREAM_ID);
 
         // then
-        verify(cambriaPublisher).send("MyPartitionKey", event.toString());
+        verify(cambriaPublisher).send("dns01cmd004", event.toString());
     }
+    
 
     @Test
     public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
         // given
         JSONObject event = new JSONObject(
-            "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\", \"another\": 8}");
+            "{\"VESuniqueId\": \"362e0146-ec5f-45f3-8d8f-bfe877c3f58e\",\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"); 
 
         // when
         eventPublisher.sendEvent(event, STREAM_ID);
 
         // then
-        verify(cambriaPublisher).send("MyPartitionKey", new JSONObject("{\"another\": 8}").toString());
+        verify(cambriaPublisher).send("dns01cmd004", new JSONObject("{\"event\":{\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\",\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\",\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}").toString());
     }
 
     @Test
@@ -86,4 +89,4 @@ public class DMaaPEventPublisherTest {
         // then
         verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
     }
-}
\ No newline at end of file
+}
index 24828c2..cedaba1 100644 (file)
@@ -1,6 +1,6 @@
 major=1
 minor=7
-patch=0
+patch=1
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT