Replace cambria with DmaaP client 04/122004/1
authorMaciej Malewski <maciej.malewski@nokia.com>
Tue, 8 Jun 2021 07:04:48 +0000 (09:04 +0200)
committerMaciej Malewski <maciej.malewski@nokia.com>
Thu, 17 Jun 2021 08:03:49 +0000 (10:03 +0200)
- remove cambria, add DmaaP client
- sending event for many topics at once is no longer supported
- add backward compatibility status codes
- add additional validation for batchEvent

Issue-ID: DCAEGEN2-1483
Signed-off-by: Maciej Malewski <maciej.malewski@nokia.com>
Change-Id: I945c38b4ab04b697ecfabd5ce38502f83fa70d1a

55 files changed:
Changelog.md
README.md
dpo/data-formats/ves-dmaap-config.json [deleted file]
etc/DmaapConfig.json [deleted file]
etc/collector.properties
etc/ves-dmaap-config.json [new file with mode: 0644]
pom.xml
src/main/java/org/onap/dcae/ApplicationSettings.java
src/main/java/org/onap/dcae/common/EventSender.java
src/main/java/org/onap/dcae/common/VESLogger.java [deleted file]
src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/model/InternalException.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/model/VesEvent.java
src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java [deleted file]
src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java [deleted file]
src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/publishing/Publisher.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java [new file with mode: 0644]
src/main/java/org/onap/dcae/common/validator/GeneralEventValidator.java
src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java [new file with mode: 0644]
src/main/java/org/onap/dcae/restapi/ApiException.java
src/main/java/org/onap/dcae/restapi/VesRestController.java
src/main/resources/log4j2.xml
src/test/java/org/onap/dcae/ApplicationSettingsTest.java
src/test/java/org/onap/dcae/TLSTest.java
src/test/java/org/onap/dcae/common/EventSenderTest.java
src/test/java/org/onap/dcae/common/publishing/DMaaPConfigurationParserTest.java
src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java [deleted file]
src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java [deleted file]
src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java [new file with mode: 0644]
src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java [new file with mode: 0644]
src/test/java/org/onap/dcae/common/publishing/PublisherTest.java [new file with mode: 0644]
src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java [new file with mode: 0644]
src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java [new file with mode: 0644]
src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java [new file with mode: 0644]
src/test/java/org/onap/dcae/restapi/ApiAuthInterceptionTest.java
src/test/java/org/onap/dcae/restapi/VesRestControllerTest.java
src/test/java/org/onap/dcae/vestest/TestVESLogger.java [deleted file]
src/test/resources/dmaap-msg-router/MsgRtrApi.properties [new file with mode: 0644]
src/test/resources/dmaap-msg-router/cadi.properties [new file with mode: 0644]
src/test/resources/dmaap-msg-router/logback.xml [new file with mode: 0644]
src/test/resources/dmaap-msg-router/message-router-compose.yml [new file with mode: 0644]
src/test/resources/dmaap-msg-router/zk_client_jaas.conf [new file with mode: 0644]
src/test/resources/dmaap-msg-router/zk_server_jaas.conf [new file with mode: 0644]
src/test/resources/testParseDMaaPCredentialsLegacy.json [deleted file]
src/test/resources/testParseDMaaPLegacy.json [deleted file]
src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json [new file with mode: 0644]
src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json [new file with mode: 0644]
src/test/resources/ves7_batch_valid_two_different_domain.json [new file with mode: 0644]
version.properties

index 2dd8933..6e107d7 100644 (file)
@@ -55,3 +55,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
 ## [1.9.2] - 14/05/2021
          -  [DCAEGEN2-2683](https://jira.onap.org/browse/DCAEGEN2-2683) - Enable Spring Prometheus metrics end-point in VES
             Temporary add mvn profile for enabling/disabling Prometheus metrics            
+## [1.10.0] - 11/06/2021
+         -  [DCAEGEN2-1483](https://jira.onap.org/browse/DCAEGEN2-1483) - VESCollector Event ordering
+            - remove cambria, add DmaaP client
+            - sending event for many topics at once is no longer supported
+            - add backward compatibility status codes
+            - add additional validation for batchEvent            
index a1733a2..407fc7a 100644 (file)
--- a/README.md
+++ b/README.md
@@ -154,6 +154,14 @@ To fetch configuration from Consul, VES collector uses CBS client from DCAE SDK.
 
 Sample configuration of VESCollector K-V store can be found under /dpo/data-formats/ConsulConfig.json
 
+### How to send event locally
+
+1. In /etc/hosts add: 127.0.0.1 onap-dmaap
+2. Go into: ./src/test/resources/dmaap-msg-router
+3. Run: docker-compose -f message-router-compose.yml up -d
+4. Run ves application
+5. Now you can send events to ves
+6. Check topics on message-router: curl http://127.0.0.1:3904/topics
 
 ### Testing
 
diff --git a/dpo/data-formats/ves-dmaap-config.json b/dpo/data-formats/ves-dmaap-config.json
deleted file mode 100644 (file)
index c3b4b80..0000000
+++ /dev/null
@@ -1,198 +0,0 @@
-{
-  "ves_syslog": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_statechange": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_thresholdCrossingAlert": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_heartbeat": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_other": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_mobileflow": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_sipsignaling": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_voicequality": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_fault": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_measurement": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_fault_supervision": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_provisioning": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_heartbeat": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_performance_assurance": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_syslog_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_statechange_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_thresholdCrossingAlert_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_heartbeat_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_other_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_mobileflow_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_sipsignaling_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_voicequality_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_fault_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_measurement_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_fault_supervision_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_provisioning_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_heartbeat_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  },
-  "ves_3gpp_performance_assurance_secondary": {
-    "type": "message_router",
-    "dmaap_info": {
-      "location": "mtl5",
-      "topic_url": "http://dmaap-mr-hostname:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  }
-}
diff --git a/etc/DmaapConfig.json b/etc/DmaapConfig.json
deleted file mode 100644 (file)
index 2ef77dc..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-{
-  "channels": [
-    {
-      "name": "ves-measurement",
-      "cambria.topic": "unauthenticated.SEC_MEASUREMENT_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-fault",
-      "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-heartbeat",
-      "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-other",
-      "cambria.topic": "unauthenticated.SEC_OTHER_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-notification",
-      "cambria.topic": "unauthenticated.VES_NOTIFICATION_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-pnfRegistration",
-      "cambria.topic": "unauthenticated.VES_PNFREG_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-3gpp-fault-supervision",
-      "cambria.topic": "unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-3gpp-provisioning",
-      "cambria.topic": "unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-3gpp-heartbeat",
-      "cambria.topic": "unauthenticated.SEC_3GPP_HEARTBEAT_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    },
-    {
-      "name": "ves-3gpp-performance-assurance",
-      "cambria.topic": "unauthenticated.unauthenticated.SEC_3GPP_PERFORMANCEASSURANCE_OUTPUT",
-      "class": "HpCambriaOutputStream",
-      "stripHpId": "true",
-      "type": "out",
-      "cambria.hosts": "onap-dmaap"
-    }
-  ]
-}
index 8861fb8..e810761 100755 (executable)
@@ -63,7 +63,7 @@ event.externalSchema.stndDefinedDataPath=$.event.stndDefinedFields.data
 \r
 ## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile\r
 collector.dmaap.streamid=fault=ves-fault|syslog=ves-syslog|heartbeat=ves-heartbeat|measurementsForVfScaling=ves-measurement|mobileFlow=ves-mobileflow|other=ves-other|stateChange=ves-statechange|thresholdCrossingAlert=ves-thresholdCrossingAlert|voiceQuality=ves-voicequality|sipSignaling=ves-sipsignaling|notification=ves-notification|pnfRegistration=ves-pnfRegistration|3GPP-FaultSupervision=ves-3gpp-fault-supervision|3GPP-Heartbeat=ves-3gpp-heartbeat|3GPP-Provisioning=ves-3gpp-provisioning|3GPP-PerformanceAssurance=ves-3gpp-performance-assurance\r
-collector.dmaapfile=./etc/DmaapConfig.json\r
+collector.dmaapfile=./etc/ves-dmaap-config.json\r
 \r
 ## Path to the file containing description of api versions\r
 collector.description.api.version.location=etc/api_version_description.json\r
@@ -75,3 +75,9 @@ event.transform.flag=1
 \r
 # Describes at what frequency (measured in minutes) should application try to fetch config from CBS\r
 collector.dynamic.config.update.frequency=5\r
+\r
+# Response compatibility (set to None to turn off ves 7.2 compatibility)\r
+# v7.2 response codes: 202, 500\r
+# None response code: 200, 413, 503\r
+collector.response.compatibility=v7.2\r
+\r
diff --git a/etc/ves-dmaap-config.json b/etc/ves-dmaap-config.json
new file mode 100644 (file)
index 0000000..b873112
--- /dev/null
@@ -0,0 +1,205 @@
+{
+  "ves-syslog": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-statechange": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-thresholdCrossingAlert": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-heartbeat": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_HEARTBEAT_OUTPUT/"
+    }
+  },
+  "ves-other": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_OTHER_OUTPUT/"
+    }
+  },
+  "ves-mobileflow": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-sipsignaling": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-voicequality": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-fault": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_FAULT_OUTPUT/"
+    }
+  },
+  "ves-measurement": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.VES_MEASUREMENT_OUTPUT/"
+    }
+  },
+  "ves-3gpp-fault-supervision": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT/"
+    }
+  },
+  "ves-3gpp-provisioning": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_PROVISIONING_OUTPUT/"
+    }
+  },
+  "ves-3gpp-heartbeat": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_HEARTBEAT_OUTPUT/"
+    }
+  },
+  "ves-3gpp-performance-assurance": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_PERFORMANCEASSURANCE_OUTPUT/"
+    }
+  },
+  "ves-syslog-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-statechange-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-thresholdCrossingAlert-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-heartbeat-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-other-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-mobileflow-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-sipsignaling-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTSves7_valid_ip_v4.json-DEV"
+    }
+  },
+  "ves-voicequality-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-fault-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-measurement-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-3gpp-fault-supervision-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT"
+    }
+  },
+  "ves-3gpp-provisioning-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.SEC_3GPP_FAULTSUPERVISION_OUTPUT"
+    }
+  },
+  "ves-3gpp-heartbeat-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-3gpp-performance-assurance-secondary": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"
+    }
+  },
+  "ves-pnfRegistration": {
+    "type": "message_router",
+    "dmaap_info": {
+      "location": "mtl5",
+      "topic_url": "http://onap-dmaap:3904/events/unauthenticated.VES_PNFREG_OUTPUT/"
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index dd8bbc2..665c5b3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
   </parent>\r
   <groupId>org.onap.dcaegen2.collectors.ves</groupId>\r
   <artifactId>VESCollector</artifactId>\r
-  <version>1.9.2-SNAPSHOT</version>\r
+  <version>1.10.0-SNAPSHOT</version>\r
   <name>dcaegen2-collectors-ves</name>\r
   <description>VESCollector</description>\r
   <properties>\r
     <maven-project-info-reports-plugin.version>2.9</maven-project-info-reports-plugin.version>\r
     <maven-surefire-plugin.version>3.0.0-M1</maven-surefire-plugin.version>\r
     <docker-maven-plugin.version>1.2.0</docker-maven-plugin.version>\r
+    <json-simple.version>1.1.1</json-simple.version>\r
     <json-schema-validator.version>1.0.49</json-schema-validator.version>\r
     <gson.version>2.8.6</gson.version>\r
     <json.version>20210307</json.version>\r
-    <cambriaClient.version>0.0.1</cambriaClient.version>\r
+    <unirest-java.version>1.4.9</unirest-java.version>\r
+    <commons-collections.version>3.2.2</commons-collections.version>\r
     <commons-configuration.version>1.10</commons-configuration.version>\r
     <vavr.version>0.10.3</vavr.version>\r
     <spring-boot-starter-log4j2.version>2.4.3</spring-boot-starter-log4j2.version>\r
     <spring-boot-starter-test.version>2.2.13.RELEASE</spring-boot-starter-test.version>\r
     <sdk.version>1.8.0</sdk.version>\r
     <guava.version>30.1-jre</guava.version>\r
+    <mock-server.version>5.11.1</mock-server.version>\r
+    <dmaap-client.version>1.8.6</dmaap-client.version>\r
+    <reactor-test.version>3.4.0</reactor-test.version>\r
+    <testcontainers.version>1.15.1</testcontainers.version>\r
+    <junit-jupiter.version>1.15.1</junit-jupiter.version>\r
   </properties>\r
   <build>\r
     <pluginManagement>\r
     </dependency>\r
     <!-- REST API RELATED -->\r
     <dependency>\r
-      <groupId>com.att.nsa</groupId>\r
-      <artifactId>cambriaClient</artifactId>\r
-      <version>${cambriaClient.version}</version>\r
+      <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>\r
+      <artifactId>dmaap-client</artifactId>\r
+      <version>${dmaap-client.version}</version>\r
+      <exclusions>\r
+        <exclusion>\r
+          <groupId>ch.qos.logback</groupId>\r
+          <artifactId>logback-classic</artifactId>\r
+        </exclusion>\r
+      </exclusions>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>io.projectreactor</groupId>\r
+      <artifactId>reactor-test</artifactId>\r
+      <version>${reactor-test.version}</version>\r
+      <scope>test</scope>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>org.testcontainers</groupId>\r
+      <artifactId>testcontainers</artifactId>\r
+      <version>${testcontainers.version}</version>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>org.testcontainers</groupId>\r
+      <artifactId>junit-jupiter</artifactId>\r
+      <version>${junit-jupiter.version}</version>\r
+    </dependency>\r
+    <dependency>\r
+      <groupId>com.mashape.unirest</groupId>\r
+      <artifactId>unirest-java</artifactId>\r
+      <version>${unirest-java.version}</version>\r
+    </dependency>\r
+    <!-- MISCELLANEOUS -->\r
+    <dependency>\r
+      <groupId>commons-collections</groupId>\r
+      <artifactId>commons-collections</artifactId>\r
+      <version>${commons-collections.version}</version>\r
     </dependency>\r
     <dependency>\r
       <groupId>commons-configuration</groupId>\r
       <version>${springfox-swagger2.version}</version>\r
       <scope>compile</scope>\r
     </dependency>\r
+    <dependency>\r
+      <groupId>org.mock-server</groupId>\r
+      <artifactId>mockserver-junit-jupiter</artifactId>\r
+      <version>${mock-server.version}</version>\r
+      <scope>test</scope>\r
+    </dependency>\r
     <dependency>\r
       <groupId>org.assertj</groupId>\r
       <artifactId>assertj-core</artifactId>\r
index 7bdef65..0acbbe2 100644 (file)
@@ -1,9 +1,9 @@
 /*
  * ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 - 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2018 - 2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,8 +21,6 @@
 
 package org.onap.dcae;
 
-import static java.lang.String.format;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
@@ -30,26 +28,32 @@ import com.networknt.schema.JsonSchema;
 import io.vavr.Function1;
 import io.vavr.collection.HashMap;
 import io.vavr.collection.Map;
-import java.io.FileReader;
-import java.io.IOException;
-import java.lang.reflect.Type;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.onap.dcae.common.EventTransformation;
 import org.onap.dcae.common.configuration.AuthMethodType;
+import org.onap.dcae.multiplestreamreducer.MultipleStreamReducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.io.FileReader;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+import static java.lang.String.format;
+
 /**
  * Abstraction over application configuration.
  * Its job is to provide easily discoverable (by method names lookup) and type safe access to configuration properties.
  */
 public class ApplicationSettings {
 
+    public static String responseCompatibility;
+
     private static final String EVENT_TRANSFORM_FILE_PATH = "./etc/eventTransform.json";
     private static final String COULD_NOT_FIND_FILE = "Couldn't find file " + EVENT_TRANSFORM_FILE_PATH;
 
@@ -62,6 +66,7 @@ public class ApplicationSettings {
     private final PropertiesConfiguration properties = new PropertiesConfiguration();
     private final Map<String, JsonSchema> loadedJsonSchemas;
     private final List<EventTransformation> eventTransformations;
+    private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer();
 
     public ApplicationSettings(String[] args, Function1<String[], Map<String, String>> argsParser) {
         this(args, argsParser, System.getProperty("user.dir"));
@@ -78,6 +83,7 @@ public class ApplicationSettings {
                 format("{\"%s\":\"etc/CommonEventFormat_28.4.1.json\"}", FALLBACK_VES_VERSION));
         loadedJsonSchemas = new JSonSchemasSupplier().loadJsonSchemas(collectorSchemaFile);
         eventTransformations = loadEventTransformations();
+        responseCompatibility = getResponseCompatibilityFlag();
     }
 
     /**
@@ -155,7 +161,7 @@ public class ApplicationSettings {
     }
 
     public String dMaaPConfigurationFileLocation() {
-        return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/DmaapConfig.json"));
+        return prependWithUserDirOnRelative(properties.getString("collector.dmaapfile", "etc/ves-dmaap-config.json"));
     }
 
     public String certSubjectMatcher(){
@@ -166,13 +172,9 @@ public class ApplicationSettings {
         return properties.getString("auth.method", AuthMethodType.NO_AUTH.value());
     }
 
-    public Map<String, String[]> getDmaapStreamIds() {
+    public Map<String, String> getDmaapStreamIds() {
         String streamIdsProperty = properties.getString("collector.dmaap.streamid", null);
-        if (streamIdsProperty == null) {
-            return HashMap.empty();
-        } else {
-            return convertDMaaPStreamsPropertyToMap(streamIdsProperty);
-        }
+        return streamIdsProperty == null ? HashMap.empty() : reduceStream(streamIdsProperty);
     }
 
     public boolean getExternalSchemaValidationCheckflag() {
@@ -203,6 +205,10 @@ public class ApplicationSettings {
         return properties.getString("collector.description.api.version.location", "etc/api_version_description.json");
     }
 
+    private String getResponseCompatibilityFlag() {
+        return properties.getString("collector.response.compatibility", "v7.2");
+    }
+
     private void loadPropertiesFromFile() {
         try {
             properties.load(configurationFileLocation);
@@ -261,6 +267,13 @@ public class ApplicationSettings {
         }
     }
 
+    private Map<String, String> reduceStream(String streamIdsProperty) {
+        Map<String, String[]> dMaaPStreamsProperty = convertDMaaPStreamsPropertyToMap(streamIdsProperty);
+        final Map<String, String> domainToStreamConfig = multipleStreamReducer.reduce(dMaaPStreamsProperty);
+        log.warn(multipleStreamReducer.getDomainToStreamsInfo(domainToStreamConfig));
+        return domainToStreamConfig;
+    }
+
     @VisibleForTesting
     String getStringDirectly(String key) {
         return properties.getString(key);
index 81c463d..400597f 100644 (file)
@@ -3,7 +3,7 @@
  * VES Collector
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  */
 package org.onap.dcae.common;
 
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
+
 import io.vavr.collection.Map;
 import org.onap.dcae.common.model.VesEvent;
 import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.restapi.EventValidatorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
 
 import java.util.List;
 
-public class EventSender {
+import static org.onap.dcae.restapi.ApiException.DOMAIN_NOT_DEFINED_FOR_STREAM_ID;
 
-  private static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
-  private Map<String, String[]> streamIdToDmaapIds;
-  private DMaaPEventPublisher eventPublisher;
-  private static final Logger log = LoggerFactory.getLogger(EventSender.class);
 
-  public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String[]> streamIdToDmaapIds) {
-    this.eventPublisher = eventPublisher;
-    this.streamIdToDmaapIds = streamIdToDmaapIds;
-  }
+public class EventSender {
 
-  public void send(List<VesEvent> vesEvents) {
-    for (VesEvent vesEvent : vesEvents) {
-      metriclog.info("EVENT_PUBLISH_START");
-      setLoggingContext(vesEvent);
-      streamIdToDmaapIds.get(vesEvent.getStreamId())
-          .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + vesEvent.asJsonObject()))
-          .forEach(dmaapIds -> sendEventsToStreams(vesEvent, dmaapIds));
-      log.debug("Message published" + vesEvent.asJsonObject());
-    }
-    log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
-    metriclog.info("EVENT_PUBLISH_END");
-  }
+    private Map<String, String> streamIdToDmaapIds;
+    private DMaaPEventPublisher eventPublisher;
+    private static final Logger log = LoggerFactory.getLogger(EventSender.class);
 
-  private void sendEventsToStreams(VesEvent vesEvent, String[] dmaapIds) {
-    for (String dmaapId : dmaapIds) {
-      log.info("Invoking publisher for streamId/domain:" + dmaapId);
-      eventPublisher.sendEvent(vesEvent, dmaapId);
+    public EventSender(DMaaPEventPublisher eventPublisher, Map<String, String> streamIdToDmaapIds) {
+        this.eventPublisher = eventPublisher;
+        this.streamIdToDmaapIds = streamIdToDmaapIds;
     }
-  }
 
-  private void setLoggingContext(VesEvent vesEvent) {
-    LoggingContext localLC = VESLogger.getLoggingContextForThread(vesEvent.getUniqueId().toString());
-    localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-    log.debug("event.VESuniqueId" + vesEvent.getUniqueId() + "event.commonEventHeader.domain:" + vesEvent.getDomain());
-  }
+    public HttpStatus send(List<VesEvent> vesEvents) {
+        String topic = streamIdToDmaapIds
+                .get(vesEvents.get(0).getStreamId())
+                .getOrElse(() -> {
+                    log.error("No StreamID defined for publish - Message dropped " + vesEvents.get(0).asJsonObject());
+                    throw new EventValidatorException(DOMAIN_NOT_DEFINED_FOR_STREAM_ID);
+                });
+        return eventPublisher.sendEvent(vesEvents, topic);
+    }
 }
diff --git a/src/main/java/org/onap/dcae/common/VESLogger.java b/src/main/java/org/onap/dcae/common/VESLogger.java
deleted file mode 100644 (file)
index 1072fb5..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- *                                             reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.common;
-
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.LoggingContextFactory;
-import com.att.nsa.logging.log4j.EcompFields;
-import java.util.UUID;
-
-public class VESLogger {
-
-
-    public static final String REQUEST_ID = "requestId";
-
-    // Common LoggingContext
-    private static LoggingContext commonLC;
-    // Thread-specific LoggingContext
-    private static LoggingContext threadLC;
-
-    /**
-     * Returns the common LoggingContext instance that is the base context for
-     * all subsequent instances.
-     *
-     * @return the common LoggingContext
-     */
-    public static LoggingContext getCommonLoggingContext() {
-        if (commonLC == null) {
-            commonLC = new LoggingContextFactory.Builder().build();
-            final UUID uuid = UUID.randomUUID();
-
-            commonLC.put(REQUEST_ID, uuid.toString());
-        }
-        return commonLC;
-    }
-
-    /**
-     * Get a logging context for the current thread that's based on the common
-     * logging context. Populate the context with context-specific values.
-     *
-     * @param aUuid uuid for request id
-     * @return a LoggingContext for the current thread
-     */
-    public static LoggingContext getLoggingContextForThread(UUID aUuid) {
-        // note that this operation requires everything from the common context
-        // to be (re)copied into the target context. That seems slow, but it
-        // actually
-        // helps prevent the thread from overwriting supposedly common data. It
-        // also
-        // should be fairly quick compared with the overhead of handling the
-        // actual
-        // service call.
-
-        threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
-        // Establish the request-specific UUID, as long as we are here...
-        threadLC.put(REQUEST_ID, aUuid.toString());
-        threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
-
-        return threadLC;
-    }
-
-    /**
-     * Get a logging context for the current thread that's based on the common
-     * logging context. Populate the context with context-specific values.
-     *
-     * @param aUuid uuid for request id
-     * @return a LoggingContext for the current thread
-     */
-    public static LoggingContext getLoggingContextForThread(String aUuid) {
-        // note that this operation requires everything from the common context
-        // to be (re)copied into the target context. That seems slow, but it
-        // actually
-        // helps prevent the thread from overwriting supposedly common data. It
-        // also
-        // should be fairly quick compared with the overhead of handling the
-        // actual
-        // service call.
-
-        threadLC = new LoggingContextFactory.Builder().withBaseContext(getCommonLoggingContext()).build();
-        // Establish the request-specific UUID, as long as we are here...
-        threadLC.put(REQUEST_ID, aUuid);
-        threadLC.put("statusCode", "COMPLETE");
-        threadLC.put(EcompFields.kEndTimestamp, SaClock.now());
-        return threadLC;
-    }
-
-}
diff --git a/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java b/src/main/java/org/onap/dcae/common/model/BackwardsCompatibilityException.java
new file mode 100644 (file)
index 0000000..aab3c44
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+public class BackwardsCompatibilityException extends RuntimeException {
+}
\ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/common/model/InternalException.java b/src/main/java/org/onap/dcae/common/model/InternalException.java
new file mode 100644 (file)
index 0000000..da93e5d
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+import org.onap.dcae.restapi.ApiException;
+
+public class InternalException extends RuntimeException {
+
+    private final ApiException apiException;
+
+    public InternalException(ApiException apiException) {
+        this.apiException = apiException;
+    }
+
+    public ApiException getApiException() {
+        return apiException;
+    }
+}
diff --git a/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java b/src/main/java/org/onap/dcae/common/model/PayloadToLargeException.java
new file mode 100644 (file)
index 0000000..e82ad77
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.model;
+
+public class PayloadToLargeException extends RuntimeException {
+}
index 8e2db80..8841955 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * VES Collector
  * ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,7 +22,9 @@ package org.onap.dcae.common.model;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.json.JSONException;
 import org.json.JSONObject;
+import java.util.Optional;
 
 /**
  * This class is a wrapper for JSONObject, that represents VES event.
@@ -111,6 +113,16 @@ public class VesEvent {
         return event.get(VES_UNIQUE_ID);
     }
 
+    /**
+     * Returns optional stndDefinedNamespace name from VES event.
+     *
+     * @return Optional stndDefinedNamespace
+     */
+    public Optional<String> getStndDefinedNamespace() throws JSONException {
+        return isStdDefinedDomain(getDomain()) ? Optional.ofNullable(getEventHeader())
+                .map(header -> header.getString(STND_DEFINED_NAMESPACE)) : Optional.empty();
+    }
+
     /**
      * Checks if type of event is same as given in paramaters.
      *
index 274e449..9f8ffcc 100644 (file)
@@ -63,28 +63,10 @@ public final class DMaaPConfigurationParser {
     }
 
     private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
-        return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+        return Try(() -> parseNewFormat(config))
                 .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
     }
 
-    private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
-        return dMaaPConfig.has("channels");
-    }
-
-    private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
-        return root.get("channels").toList().toMap(
-                channel -> channel.get("name").toString(),
-                channel -> {
-                    String destinationsStr = channel.getAsOption("cambria.url")
-                            .getOrElse(channel.getAsOption("cambria.hosts").get())
-                            .toString();
-                    String topic = channel.get("cambria.topic").toString();
-                    Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
-                    Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
-                    List<String> destinations = List(destinationsStr.split(","));
-                    return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
-                });
-    }
 
     private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
         return root.keys().toMap(
index 2b4cfc1..08e16e0 100644 (file)
@@ -3,7 +3,7 @@
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
  * Copyright (C) 2017,2020 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcae.common.publishing;
 
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
 import io.vavr.collection.Map;
-import io.vavr.control.Try;
-import org.onap.dcae.common.VESLogger;
 import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Flux;
 
-import java.io.IOException;
-
-import static org.onap.dcae.common.publishing.VavrUtils.f;
+import java.util.List;
+import java.util.Objects;
 
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
 /**
  * @author Pawel Szalapski (pawel.szalapski@nokia.com)
  */
 public class DMaaPEventPublisher {
-    private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
     private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
-    private DMaaPPublishersCache publishersCache;
-    private final Logger outputLogger = LoggerFactory.getLogger("org.onap.dcae.common.output");
-
-    DMaaPEventPublisher(DMaaPPublishersCache publishersCache) {
-        this.publishersCache = publishersCache;
-    }
+    private Map<String, PublisherConfig> dMaaPConfig;
+    private final Publisher dmaapPublisher;
 
     public DMaaPEventPublisher(Map<String, PublisherConfig> dMaaPConfig) {
-        this(new DMaaPPublishersCache(dMaaPConfig));
+        this.dMaaPConfig = dMaaPConfig;
+        dmaapPublisher = new Publisher();
     }
 
     /**
@@ -58,48 +51,29 @@ public class DMaaPEventPublisher {
      * @param dmaapConfiguration Dmaap configuration
      */
     public void reload(Map<String, PublisherConfig> dmaapConfiguration){
-        this.publishersCache = new DMaaPPublishersCache(dmaapConfiguration);
-    }
-
-    public void sendEvent(VesEvent vesEvent, String dmaapId){
-        clearVesUniqueIdFromEvent(vesEvent);
-        publishersCache.getPublisher(dmaapId)
-                .onEmpty(() ->
-                        log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", dmaapId, vesEvent)))
-                .forEach(publisher -> sendEvent(vesEvent, dmaapId, publisher));
-    }
-
-    private void sendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher) {
-        Try.run(() -> uncheckedSendEvent(event, dmaapId, publisher))
-                .onFailure(exc -> closePublisher(event, dmaapId, exc));
+        dMaaPConfig = dmaapConfiguration;
+        log.info("reload dmaap configuration");
     }
 
-    private void uncheckedSendEvent(VesEvent event, String dmaapId, CambriaBatchingPublisher publisher)
-            throws IOException {
-
-        String pk = event.getPK();
-        int pendingMsgs = publisher.send(pk, event.asJsonObject().toString());
-        if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
-            log.info("Pending messages count: " + pendingMsgs);
-        }
-        String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, dmaapId);
-        log.info(infoMsg);
-        outputLogger.info(infoMsg);
+    public HttpStatus sendEvent(List<VesEvent> vesEvents, String dmaapId) {
+        clearVesUniqueIdFromEvent(vesEvents);
+        io.vavr.collection.List<String> events = mapListOfEventsToVavrList(vesEvents);
+        Flux<MessageRouterPublishResponse> messageRouterPublishFlux = dmaapPublisher.publishEvents(events, dMaaPConfig.get(dmaapId));
+        MessageRouterPublishResponse messageRouterPublishResponse = messageRouterPublishFlux.blockFirst();
+        return getHttpStatus(Objects.requireNonNull(messageRouterPublishResponse));
     }
 
-    private void closePublisher(VesEvent event, String dmaapId, Throwable e) {
-        log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
-                event, dmaapId), e);
-        publishersCache.closePublisherFor(dmaapId);
+    private io.vavr.collection.List<String> mapListOfEventsToVavrList(List<VesEvent> vesEvents) {
+        return io.vavr.collection.List.ofAll(vesEvents)
+                .map(event -> event.asJsonObject().toString());
     }
 
-    private void clearVesUniqueIdFromEvent(VesEvent event) {
-        if (event.hasType(VesEvent.VES_UNIQUE_ID)) {
-            String uuid = event.getUniqueId().toString();
-            LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
-            localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-            log.debug("Removing VESuniqueid object from event");
-            event.removeElement(VesEvent.VES_UNIQUE_ID);
-        }
+    private void clearVesUniqueIdFromEvent(List<VesEvent> events) {
+        events.stream()
+                .filter(event -> event.hasType(VesEvent.VES_UNIQUE_ID))
+                .forEach(event -> {
+                    log.debug("Removing VESuniqueid object from event");
+                    event.removeElement(VesEvent.VES_UNIQUE_ID);
+                });
     }
 }
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
deleted file mode 100644 (file)
index a93073b..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
-import io.vavr.control.Try;
-
-import static io.vavr.API.Try;
-import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-final class DMaaPPublishersBuilder {
-
-    static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
-        return Try(() -> builder(config).build())
-                .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
-    }
-
-    private static PublisherBuilder builder(PublisherConfig config) {
-        if (config.isSecured()) {
-            return authenticatedBuilder(config);
-        } else {
-            return unAuthenticatedBuilder(config);
-        }
-    }
-
-    private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
-        return unAuthenticatedBuilder(config)
-                .usingHttps()
-                .authenticatedByHttp(config.userName().get(), config.password().get());
-    }
-
-    private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
-        return new CambriaClientBuilders.PublisherBuilder()
-                .usingHosts(config.destinations().mkString(","))
-                .onTopic(config.topic())
-                .logSendFailuresAfter(5);
-    }
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
deleted file mode 100644 (file)
index b7997ef..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import com.google.common.cache.*;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static io.vavr.API.Option;
-import static org.onap.dcae.common.publishing.VavrUtils.f;
-
-/**
- * @author Pawel Szalapski (pawel.szalapski@nokia.com)
- */
-class DMaaPPublishersCache {
-
-    private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
-    private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
-    private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
-
-    DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
-        this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
-        this.publishersCache = CacheBuilder.newBuilder()
-                .removalListener(new OnPublisherRemovalListener())
-                .build(new CambriaPublishersCacheLoader());
-    }
-
-    DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
-                         OnPublisherRemovalListener onPublisherRemovalListener,
-                         Map<String, PublisherConfig> dMaaPConfiguration) {
-        this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
-        this.publishersCache = CacheBuilder.newBuilder()
-                .removalListener(onPublisherRemovalListener)
-                .build(dMaaPPublishersCacheLoader);
-    }
-
-    Option<CambriaBatchingPublisher> getPublisher(String streamID) {
-        try {
-            return Option(publishersCache.getUnchecked(streamID));
-        } catch (Exception e) {
-            log.warn("Could not create / load Cambria Publisher for streamID", e);
-            return Option.none();
-        }
-    }
-
-    void closePublisherFor(String streamId) {
-        publishersCache.invalidate(streamId);
-    }
-
-    synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
-        Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
-        Map<String, PublisherConfig> removedConfigurations = currentConfig
-                .filterKeys(domain -> !newConfig.containsKey(domain));
-        Map<String, PublisherConfig> changedConfigurations = newConfig
-                .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
-        dMaaPConfiguration.set(newConfig);
-        removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
-    }
-
-    static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
-
-        @Override
-        public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
-            CambriaBatchingPublisher publisher = notification.getValue();
-            if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
-                try {
-                    int timeout = 20;
-                    TimeUnit unit = TimeUnit.SECONDS;
-                    java.util.List<?> stuck = publisher.close(timeout, unit);
-                    if (!stuck.isEmpty()) {
-                        log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
-                                + "%s messages were dropped", stuck.size(), timeout, unit));
-                    }
-                } catch (InterruptedException | IOException e) {
-                    log.error("Could not close Cambria publisher, some messages might have been dropped", e);
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-    }
-
-    class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
-
-        @Override
-        public CambriaBatchingPublisher load(@Nonnull String domain) {
-            return dMaaPConfiguration.get()
-                    .get(domain)
-                    .toTry(() -> new RuntimeException(
-                            f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
-                    .flatMap(DMaaPPublishersBuilder::buildPublisher)
-                    .get();
-        }
-    }
-
-}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java b/src/main/java/org/onap/dcae/common/publishing/DmaapRequestConfiguration.java
new file mode 100644 (file)
index 0000000..2eaeab6
--- /dev/null
@@ -0,0 +1,101 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapRetryConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapTimeoutConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+
+public class DmaapRequestConfiguration {
+
+    private static final Long TIMEOUT_SECONDS = 10L;
+    private static final int RETRY_INTERVAL_IN_SECONDS = 1;
+    private static final int RETRY_COUNT = 1;
+
+    private DmaapRequestConfiguration() {
+    }
+
+    static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig, Long timeout) {
+        String topicUrl = createUrl(publisherConfig);
+        return ImmutableMessageRouterPublishRequest.builder()
+                .sinkDefinition(createMessageRouterSink(topicUrl))
+                .contentType(ContentType.APPLICATION_JSON)
+                .timeoutConfig(timeOutConfiguration(timeout))
+                .build();
+    }
+
+    static MessageRouterPublishRequest createPublishRequest(Option<PublisherConfig> publisherConfig) {
+        return createPublishRequest(publisherConfig, TIMEOUT_SECONDS);
+    }
+
+    static Flux<JsonObject> jsonBatch(List<String> messages) {
+        return Flux.fromIterable(getAsJsonObjects(messages));
+    }
+
+    static MessageRouterPublisherConfig retryConfiguration() {
+        return ImmutableMessageRouterPublisherConfig.builder()
+                .retryConfig(ImmutableDmaapRetryConfig.builder()
+                        .retryIntervalInSeconds(RETRY_INTERVAL_IN_SECONDS)
+                        .retryCount(RETRY_COUNT)
+                        .build())
+                .build();
+    }
+
+    private static String createUrl(Option<PublisherConfig> publisherConfig) {
+        String hostAndPort = publisherConfig.get().getHostAndPort();
+        String topicName = publisherConfig.get().topic();
+        return String.format("http://%s/events/%s/",hostAndPort,topicName);
+    }
+
+    private static List<JsonObject> getAsJsonObjects(List<String> messages) {
+        return getAsJsonElements(messages).map(JsonElement::getAsJsonObject);
+    }
+
+    static List<JsonElement> getAsJsonElements(List<String> messages) {
+        return messages.map(JsonParser::parseString);
+    }
+
+    static ImmutableMessageRouterSink createMessageRouterSink(String topicUrl) {
+        return ImmutableMessageRouterSink.builder()
+                .name("the topic")
+                .topicUrl(topicUrl)
+                .build();
+    }
+
+    @NotNull
+    private static ImmutableDmaapTimeoutConfig timeOutConfiguration(Long timeout) {
+        return ImmutableDmaapTimeoutConfig.builder().timeout(Duration.ofSeconds(timeout)).build();
+    }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java b/src/main/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapper.java
new file mode 100644 (file)
index 0000000..b5c735b
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.jetbrains.annotations.NotNull;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+
+import java.util.Objects;
+
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+
+public class MessageRouterHttpStatusMapper {
+
+    private static final Logger log = LoggerFactory.getLogger(MessageRouterHttpStatusMapper.class);
+
+    private MessageRouterHttpStatusMapper() {
+    }
+
+    @NotNull
+    static HttpStatus getHttpStatus(MessageRouterPublishResponse messageRouterPublishResponse) {
+        return responseCompatibility.equals("v7.2") ?
+                getHttpStatusBackwardsCompatibility(messageRouterPublishResponse):
+                getHttpStatusWithMappedResponseCode(messageRouterPublishResponse);
+    }
+
+    @NotNull
+    private static HttpStatus getHttpStatusBackwardsCompatibility(MessageRouterPublishResponse messageRouterPublishResponse) {
+        if (isHttpOk(messageRouterPublishResponse)) {
+            log.info("Successfully send event to MR");
+            return HttpStatus.ACCEPTED;
+        } else {
+            log.error(messageRouterPublishResponse.failReason());
+            throw new BackwardsCompatibilityException();
+        }
+    }
+
+    @NotNull
+    private static HttpStatus getHttpStatusWithMappedResponseCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+        if (isHttpOk(messageRouterPublishResponse)) {
+            log.info("Successfully send event to MR");
+            return HttpStatus.OK;
+        } else if (isHttp413(messageRouterPublishResponse)) {
+            log.error(messageRouterPublishResponse.failReason());
+            throw new PayloadToLargeException();
+        } else {
+            log.error(messageRouterPublishResponse.failReason());
+            throw new InternalException(responseBody(resolveHttpCode(messageRouterPublishResponse)));
+        }
+    }
+
+    @NotNull
+    private static String resolveHttpCode(MessageRouterPublishResponse messageRouterPublishResponse) {
+        return Objects.requireNonNull(messageRouterPublishResponse.failReason()).substring(0, 3);
+    }
+
+    @NotNull
+    private static ApiException responseBody(String substring) {
+        switch (substring) {
+            case "404":
+                return ApiException.NOT_FOUND;
+            case "408":
+                return ApiException.REQUEST_TIMEOUT;
+            case "429":
+                return ApiException.TOO_MANY_REQUESTS;
+            case "502":
+                return ApiException.BAD_GATEWAY;
+            case "503":
+                return ApiException.SERVICE_UNAVAILABLE;
+            case "504":
+                return ApiException.GATEWAY_TIMEOUT;
+            default:
+                return ApiException.INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    private static boolean isHttpOk(MessageRouterPublishResponse messageRouterPublishResponse) {
+        return messageRouterPublishResponse.successful();
+    }
+
+    private static boolean isHttp413(MessageRouterPublishResponse messageRouterPublishResponse) {
+        return Objects.requireNonNull(messageRouterPublishResponse.failReason()).startsWith("413");
+    }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/Publisher.java b/src/main/java/org/onap/dcae/common/publishing/Publisher.java
new file mode 100644 (file)
index 0000000..1d688d8
--- /dev/null
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.retryConfiguration;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.jsonBatch;
+
+public class Publisher {
+
+    private final MessageRouterPublisher publisher;
+
+    public Publisher() {
+        this(retryConfiguration());
+    }
+
+    public Publisher(MessageRouterPublisherConfig messageRouterPublisherConfig) {
+        publisher = DmaapClientFactory
+                .createMessageRouterPublisher(messageRouterPublisherConfig);
+    }
+
+    /**
+     * Publish event
+     *
+     * @param events list of ves events prepared to send
+     * @param publisherConfig publisher configuration
+     * @return flux containing information about the success or failure of the event publication
+     */
+    public Flux<MessageRouterPublishResponse> publishEvents(List<String> events, Option<PublisherConfig> publisherConfig) {
+        return publishEvents(events, createPublishRequest(publisherConfig));
+    }
+
+    Flux<MessageRouterPublishResponse> publishEvents(List<String> events, MessageRouterPublishRequest publishRequest) {
+        final Flux<JsonObject> jsonMessageBatch = jsonBatch(events);
+        return publisher.put(publishRequest, jsonMessageBatch);
+    }
+}
index 1fd0d31..0bb5192 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ public final class PublisherConfig {
         this.topic = topic;
     }
 
+
     PublisherConfig(List<String> destinations, String topic, String userName, String password) {
         this.destinations = destinations;
         this.topic = topic;
@@ -50,6 +51,10 @@ public final class PublisherConfig {
         return destinations;
     }
 
+    String getHostAndPort(){
+        return destinations.get(0);
+    }
+
     String topic() {
         return topic;
     }
diff --git a/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java b/src/main/java/org/onap/dcae/common/validator/BatchEventValidator.java
new file mode 100644 (file)
index 0000000..ea92074
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.validator;
+
+import io.vavr.control.Try;
+import org.json.JSONException;
+import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.restapi.ApiException;
+import org.onap.dcae.restapi.EventValidatorException;
+
+import java.util.List;
+
+import static java.util.stream.Collectors.toSet;
+import static org.onap.dcae.restapi.ApiException.DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT;
+import static org.onap.dcae.restapi.ApiException.DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED;
+
+public class BatchEventValidator {
+
+    private BatchEventValidator() {
+    }
+
+    /**
+     * Check if value of domain fields are the same in every event,
+     * in case of stndDefined check stndDefinedNamespace fields
+     *
+     * @param events list of checked ves events
+     * @throws EventValidatorException when domain fields value or stndDefinedNamespace fields value are note the same
+     */
+    public static void executeBatchEventValidation(List<VesEvent> events) throws EventValidatorException {
+        if (hasNotEveryEventSameDomain(events)) {
+            throw new EventValidatorException(DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT);
+        }
+        if (isDomainStndDefined(events) && hasNotSameStndDefinedNamespace(events)) {
+            throw new EventValidatorException(DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED);
+        }
+    }
+
+    private static boolean hasNotEveryEventSameDomain(List<VesEvent> events) {
+        return events.stream()
+                .map(VesEvent::getDomain)
+                .collect(toSet())
+                .size() != 1;
+    }
+
+    private static boolean hasNotSameStndDefinedNamespace(List<VesEvent> events)  {
+        return Try.of(() -> isAllStndDefinedNamespace(events))
+                .getOrElseThrow(() -> new EventValidatorException(ApiException.MISSING_NAMESPACE_PARAMETER));
+    }
+
+    private static boolean isAllStndDefinedNamespace(List<VesEvent> events) {
+        return events.stream()
+                .map(e -> e.getStndDefinedNamespace().orElse(""))
+                .collect(toSet())
+                .size() != 1;
+    }
+
+    private static boolean isDomainStndDefined(List<VesEvent> events) throws JSONException{
+        return events.stream()
+                .allMatch((e -> e.getDomain().equals("stndDefined")));
+    }
+}
index 5cd5dc2..975db79 100644 (file)
@@ -25,7 +25,6 @@ import org.onap.dcae.ApplicationSettings;
 import org.onap.dcae.common.model.VesEvent;
 import org.onap.dcae.restapi.ApiException;
 import org.onap.dcae.restapi.EventValidatorException;
-
 /**
  * This class is using ApplicationSetting and SchemaValidator to validate VES event.
  *
diff --git a/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java b/src/main/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducer.java
new file mode 100644 (file)
index 0000000..c03ab6b
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.multiplestreamreducer;
+
+import io.vavr.Tuple2;
+import io.vavr.collection.Map;
+
+public class MultipleStreamReducer {
+
+    /**
+     * Converts configuration from: "one domain many streams"
+     * to: "one domain one stream"
+     *
+     * @param map domain to streams configuration
+     * @return configuration - one domain one stream
+     */
+    public Map<String, String> reduce(Map<String, String[]> map) {
+        return map.toStream()
+                .toMap(Tuple2::_1, v -> v._2[0]);
+    }
+
+    /**
+     * Information about the current match: domain to stream
+     *
+     * @param domainToStreamConfig domain to stream configuration
+     * @return current domain to stream information
+     */
+    public String getDomainToStreamsInfo(Map<String, String> domainToStreamConfig) {
+        return domainToStreamConfig.map(v -> "Domain: " +
+                v._1 + " has active stream: " + v._2 + System.lineSeparator())
+                .reduce((a, b) -> a + b);
+    }
+}
index dbd41a4..e819cbd 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -40,8 +40,18 @@ public enum ApiException {
     NO_SERVER_RESOURCES(ExceptionType.SERVICE_EXCEPTION, "SVC1000", "No server resources (internal processing queue full)", 503),
     STND_DEFINED_VALIDATION_FAILED(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.data invalid against event.stndDefinedFields.schemaReference", "400"), 400),
     NO_LOCAL_SCHEMA_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2004", "Invalid input value for %1 %2: %3", List.of("attribute", "event.stndDefinedFields.schemaReference", "Referred external schema not present in schema repository"), 400),
-    INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400);
-
+    INCORRECT_INTERNAL_FILE_REFERENCE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("event.stndDefinedFields.schemaReference value does not correspond to any external event schema file in externalSchema repo", "400"), 400),
+    DIFFERENT_DOMAIN_FIELDS_IN_BATCH_EVENT(ExceptionType.SERVICE_EXCEPTION, "SVC0001", "Different value of domain fields in Batch Event", 400),
+    DIFFERENT_STND_DEFINED_NAMESPACE_WHEN_DOMAIN_STND_DEFINED(ExceptionType.SERVICE_EXCEPTION, "SVC0001","Value of stndDefinedNamespace fields have to be same when domain is stndDefined",400),
+    DOMAIN_NOT_DEFINED_FOR_STREAM_ID(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("No domain defined for stream id", "400"), 400),
+    PAYLOAD_TO_LARGE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Entity Too Large", "413"), 413),
+    NOT_FOUND(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Not Found","404"), 404),
+    REQUEST_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Request Timeout","408"), 408),
+    TOO_MANY_REQUESTS(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Too Many Requests","429"), 429),
+    INTERNAL_SERVER_ERROR(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Internal Server Error","500"), 500),
+    BAD_GATEWAY(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Bad Gateway","502"), 502),
+    SERVICE_UNAVAILABLE(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Service Unavailable","503"), 503),
+    GATEWAY_TIMEOUT(ExceptionType.SERVICE_EXCEPTION, "SVC2000", "The following service error occurred: %1. Error code is %2", List.of("Gateway Timeout","504"), 504);
 
     public final int httpStatusCode;
     private final ExceptionType type;
index 93e428b..6c4fb8e 100644 (file)
@@ -3,7 +3,7 @@
  * VES Collector
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2020 Nokia. All rights reserved.
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 package org.onap.dcae.restapi;
 
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
 import org.json.JSONObject;
 import org.onap.dcae.ApplicationSettings;
 import org.onap.dcae.common.EventSender;
 import org.onap.dcae.common.EventUpdater;
 import org.onap.dcae.common.HeaderUtils;
-import org.onap.dcae.common.validator.GeneralEventValidator;
-import org.onap.dcae.common.validator.StndDefinedDataValidator;
-import org.onap.dcae.common.VESLogger;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
 import org.onap.dcae.common.model.StndDefinedNamespaceParameterHasEmptyValueException;
 import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException;
 import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.common.validator.GeneralEventValidator;
+import org.onap.dcae.common.validator.StndDefinedDataValidator;
 import org.onap.dcaegen2.services.sdk.standardization.header.CustomHeaderUtils;
 import org.slf4j.Logger;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -50,8 +50,9 @@ import javax.servlet.http.HttpServletRequest;
 import java.util.List;
 import java.util.UUID;
 
-import static org.springframework.http.ResponseEntity.accepted;
+import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation;
 import static org.springframework.http.ResponseEntity.badRequest;
+import static org.springframework.http.ResponseEntity.status;
 
 @RestController
 public class VesRestController {
@@ -113,22 +114,29 @@ public class VesRestController {
             generalEventValidator.validate(vesEvent, type, version);
             List<VesEvent> vesEvents = transformEvent(vesEvent, type, version, requestURI);
             executeStndDefinedValidation(vesEvents);
-            eventSender.send(vesEvents);
+            executeBatchEventValidation(vesEvents);
+            HttpStatus httpStatus = eventSender.send(vesEvents);
+            return status(httpStatus).contentType(MediaType.APPLICATION_JSON).body("Successfully send event");
         } catch (EventValidatorException e) {
-           logger.error(e.getMessage());
-            return ResponseEntity.status(e.getApiException().httpStatusCode)
+            logger.error(e.getMessage());
+            return status(e.getApiException().httpStatusCode)
                     .body(e.getApiException().toJSON().toString());
         } catch (StndDefinedNamespaceParameterNotDefinedException e) {
-            return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
+            return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
                     .body(ApiException.MISSING_NAMESPACE_PARAMETER.toJSON().toString());
         } catch (StndDefinedNamespaceParameterHasEmptyValueException e) {
-            return ResponseEntity.status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
+            return status(ApiException.MISSING_NAMESPACE_PARAMETER.httpStatusCode)
                     .body(ApiException.EMPTY_NAMESPACE_PARAMETER.toJSON().toString());
+        } catch (InternalException e) {
+            return status(ApiException.SERVICE_UNAVAILABLE.httpStatusCode)
+                    .body(e.getApiException().toJSON().toString());
+        } catch (PayloadToLargeException e) {
+            return status(ApiException.PAYLOAD_TO_LARGE.httpStatusCode)
+                    .body(ApiException.PAYLOAD_TO_LARGE.toJSON().toString());
+        } catch (BackwardsCompatibilityException e) {
+            return status(ApiException.INTERNAL_SERVER_ERROR.httpStatusCode)
+                    .body(ApiException.INTERNAL_SERVER_ERROR.toJSON().toString());
         }
-
-        // TODO call service and return status, replace CambriaClient, split event to single object and list of them
-        return accepted().headers(this.headerUtils.fillHeaders(headerUtils.getRspCustomHeader()))
-                .contentType(MediaType.APPLICATION_JSON).body("Accepted");
     }
 
     private void executeStndDefinedValidation(List<VesEvent> vesEvents) {
@@ -142,7 +150,6 @@ public class VesRestController {
                 headerUtils.extractHeaders(request),
                 settings.getApiVersionDescriptionFilepath(),
                 headerUtils.getRestApiIdentify(request.getRequestURI()));
-
     }
 
     private List<VesEvent> transformEvent(VesEvent vesEvent, String type, String version, String requestURI) {
@@ -151,13 +158,7 @@ public class VesRestController {
 
     private UUID generateUUID(VesEvent vesEvent, String version, String uri) {
         UUID uuid = UUID.randomUUID();
-        setUpECOMPLoggingForRequest(uuid);
         requestLogger.info(String.format(VES_EVENT_MESSAGE, vesEvent.asJsonObject(), uuid, version, uri));
         return uuid;
     }
-
-    private static void setUpECOMPLoggingForRequest(UUID uuid) {
-        LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
-        localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-    }
-}
\ No newline at end of file
+}
index 3f915d0..76d3411 100644 (file)
@@ -47,7 +47,7 @@
     </RollingFile>
 
     <RollingFile fileName="logs/eelf/audit.log" filePattern="logs/eelf/audit-%d{yyyy-MM-dd}-%i.log" name="EELF_AUDIT">
-      <LevelRangeFilter maxLevel="DEBUG" minLevel="INFO"/>
+      <LevelRangeFilter maxLevel="TRACE" minLevel="INFO"/>
       <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss} %-5p %m%n"/>
       <Policies>
         <SizeBasedTriggeringPolicy size="64 MB"/>
@@ -87,7 +87,7 @@
   </Appenders>
 
   <Loggers>
-    <logger additivity="false" level="error" name="org.onap.dcaegen2.services.sdk">
+    <logger additivity="true" level="trace" name="org.onap.dcaegen2.services.sdk">
       <AppenderRef ref="ROL_CONSOLE"/>
       <AppenderRef ref="EFILE"/>
     </logger>
index 6ea94ab..d587761 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
- * Copyright (C) 2018 - 2020 Nokia. All rights reserved.
+ * Copyright (C) 2018 - 2021 Nokia. All rights reserved.
  * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.networknt.schema.JsonSchema;
 import io.vavr.collection.HashMap;
 import io.vavr.collection.Map;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
@@ -37,11 +38,10 @@ import java.util.Arrays;
 import java.util.Objects;
 
 import static java.util.Collections.singletonList;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.onap.dcae.CLIUtils.processCmdLine;
 import static org.onap.dcae.TestingUtilities.createTemporaryFile;
 
@@ -232,15 +232,6 @@ public class ApplicationSettingsTest {
         assertEquals(sanitizePath("/somewhere/dmaapFile"), dmaapConfigFileLocation);
     }
 
-    @Test
-    public void shouldReturnDefaultDMAAPConfigFileLocation() throws IOException {
-        // when
-        String dmaapConfigFileLocation = fromTemporaryConfiguration().dMaaPConfigurationFileLocation();
-
-        // then
-        assertEquals(sanitizePath("etc/DmaapConfig.json"), dmaapConfigFileLocation);
-    }
-
     @Test
     public void shouldTellIfSchemaValidationIsEnabled() throws IOException {
         // when
@@ -315,26 +306,26 @@ public class ApplicationSettingsTest {
     @Test
     public void shouldReturnDMAAPStreamId() throws IOException {
         // given
-        Map<String, String[]> expected = HashMap.of(
-                "log", new String[]{"ves-syslog", "ves-auditlog"},
-                "fault", new String[]{"ves-fault"}
+        Map<String, String> expected = HashMap.of(
+                "log", "ves-syslog",
+                "fault", "ves-fault"
         );
 
         // when
-        Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration(
-                "collector.dmaap.streamid=fault=ves-fault|log=ves-syslog,ves-auditlog")
+        Map<String, String> dmaapStreamID = fromTemporaryConfiguration(
+                "collector.dmaap.streamid=fault=ves-fault,stream1|log=ves-syslog,stream2,stream3")
                 .getDmaapStreamIds();
 
         // then
-        assertArrayEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get());
-        assertArrayEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get());
+        assertEquals(expected.get("log").get(), Objects.requireNonNull(dmaapStreamID).get("log").get());
+        assertEquals(expected.get("fault").get(), Objects.requireNonNull(dmaapStreamID).get("fault").get());
         assertEquals(expected.keySet(), dmaapStreamID.keySet());
     }
 
     @Test
     public void shouldReturnDefaultDMAAPStreamId() throws IOException {
         // when
-        Map<String, String[]> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds();
+        Map<String, String> dmaapStreamID = fromTemporaryConfiguration().getDmaapStreamIds();
 
         // then
         assertEquals(dmaapStreamID, HashMap.empty());
@@ -391,24 +382,24 @@ public class ApplicationSettingsTest {
     }
 
     @Test
-    public void shouldReturnCambriaConfigurationFileLocation() throws IOException {
+    public void shouldReturnConfigurationFileLocation() throws IOException {
         // when
-        String cambriaConfigurationFileLocation = fromTemporaryConfiguration(
-                "collector.dmaapfile=/somewhere/dmaapConfig")
+        String configurationFileLocation = fromTemporaryConfiguration(
+                "collector.dmaapfile=/somewhere/etc/ves-dmaap-config.json")
                 .dMaaPConfigurationFileLocation();
 
         // then
-        assertEquals(sanitizePath("/somewhere/dmaapConfig"), cambriaConfigurationFileLocation);
+        assertEquals(sanitizePath("/somewhere/etc/ves-dmaap-config.json"), configurationFileLocation);
     }
 
     @Test
-    public void shouldReturnDefaultCambriaConfigurationFileLocation() throws IOException {
+    public void shouldReturnDefaultConfigurationFileLocation() throws IOException {
         // when
-        String cambriaConfigurationFileLocation = fromTemporaryConfiguration()
+        String configurationFileLocation = fromTemporaryConfiguration()
                 .dMaaPConfigurationFileLocation();
 
         // then
-        assertEquals(sanitizePath("etc/DmaapConfig.json"), cambriaConfigurationFileLocation);
+        assertEquals(sanitizePath("etc/ves-dmaap-config.json"), configurationFileLocation);
     }
 
     @Test
index 424ddf8..d33ae3e 100644 (file)
@@ -106,4 +106,4 @@ public class TLSTest extends TLSTestBase {
             when(settings.getExternalSchemaStndDefinedDataPath()).thenReturn(STND_DEFINED_DATA_PATH);
         }
     }
-}
\ No newline at end of file
+}
index 454cfb5..6d508d0 100644 (file)
@@ -1,9 +1,9 @@
 /*
  * ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
  * ================================================================================
  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
+ * Copyright (C) 2018-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.onap.dcae.common.model.StndDefinedNamespaceParameterNotDefinedException;
 import org.onap.dcae.common.model.VesEvent;
 import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.restapi.EventValidatorException;
 
 import java.io.IOException;
 import java.util.List;
@@ -53,31 +54,18 @@ public class EventSenderTest {
     List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
 
     // when
-    eventSender.send(eventToSend);
+    assertThatExceptionOfType(EventValidatorException.class)
+            .isThrownBy(() -> eventSender.send(eventToSend));
 
     // then
     verifyThatEventWasNotSendAtStream();
   }
 
-  @Test
-  public void shouldSendEventAtStreamsAssignedToEventDomain() throws IOException {
-    // given
-    EventSender eventSender = givenConfiguredEventSender(HashMap.of("fault", new String[]{"ves-fault", "fault-ves"}));
-    List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves7_valid_event.json");
-
-    // when
-    eventSender.send(eventToSend);
-
-    //then
-    verifyThatEventWasSendAtStream("ves-fault");
-    verifyThatEventWasSendAtStream("fault-ves");
-  }
-
   @Test
   public void shouldSendStdDefinedEventAtStreamAssignedToEventDomain() throws IOException {
     // given
     EventSender eventSender = givenConfiguredEventSender(
-            HashMap.of("3GPP-FaultSupervision", new String[]{"ves-3gpp-fault-supervision"})
+            HashMap.of("3GPP-FaultSupervision", "ves-3gpp-fault-supervision")
     );
     List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
 
@@ -95,7 +83,8 @@ public class EventSenderTest {
     List<VesEvent> eventToSend = createEventToSend("/eventsAfterTransformation/ves_stdnDefined_valid.json");
 
     // when
-    eventSender.send(eventToSend);
+    assertThatExceptionOfType(EventValidatorException.class)
+            .isThrownBy(() -> eventSender.send(eventToSend));
 
     // then
     verifyThatEventWasNotSendAtStream();
@@ -122,7 +111,7 @@ public class EventSenderTest {
     return givenEventToSend(event);
   }
 
-  private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String[]> streamIds) {
+  private EventSender givenConfiguredEventSender(io.vavr.collection.Map<String, String> streamIds) {
     return new EventSender(eventPublisher, streamIds);
   }
 
@@ -132,10 +121,10 @@ public class EventSenderTest {
   }
 
   private void verifyThatEventWasNotSendAtStream() {
-    verify(eventPublisher,never()).sendEvent(any(),any());
+      verify(eventPublisher,never()).sendEvent(any(),any());
   }
 
   private void verifyThatEventWasSendAtStream(String s) {
-    verify(eventPublisher).sendEvent(any(), eq(s));
-  }
+      verify(eventPublisher).sendEvent(any(), eq(s));
+    }
 }
index 923aae0..9aaeb28 100644 (file)
@@ -3,7 +3,7 @@
  * org.onap.dcaegen2.collectors.ves
  * ================================================================================
  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018,2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -56,29 +56,6 @@ public class DMaaPConfigurationParserTest {
         assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
     }
 
-
-    @Test
-    public void testParseCredentialsForLegacy() {
-        Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
-        Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
-
-        PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
-        assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
-        assertThat(authCredentialsNull.password().isEmpty()).isTrue();
-        assertThat(authCredentialsNull.isSecured()).isFalse();
-
-        PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
-        assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
-        assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
-        assertThat(authCredentialsPresent.isSecured()).isTrue();
-
-        PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
-        assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
-        assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
-        assertThat(authCredentialsMissing.isSecured()).isFalse();
-    }
-
-
     @Test
     public void testParseGen2() {
         Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
@@ -93,22 +70,4 @@ public class DMaaPConfigurationParserTest {
         assertThat(withOtherSegment.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
     }
 
-    @Test
-    public void testParseLegacy() {
-        Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
-        Try<Map<String, PublisherConfig>> publisherConfigs = DMaaPConfigurationParser
-            .parseToDomainMapping(exemplaryConfig);
-
-        PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
-        assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
-        assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
-        PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
-        assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
-        assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-
-        PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
-        assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
-        assertThat(urlIsMissing.topic()).isEqualTo("DCAE-SE-COLLECTOR-EVENTS-DEV");
-    }
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPEventPublisherTest.java
deleted file mode 100644 (file)
index e4b6fd9..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018,2020 Nokia. All rights reserved.
- * Copyright (C) 2020 AT&T. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import org.json.JSONObject;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.model.VesEvent;
-
-import java.io.IOException;
-
-import static io.vavr.API.Option;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public class DMaaPEventPublisherTest {
-
-  private static final String STREAM_ID = "sampleStreamId";
-
-  private static final JSONObject EXPECTED_EVENT =
-      new JSONObject(
-          "{\"VESversion\":\"v7\",\"event\":{"
-              + "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019,"
-              + "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
-              + "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\","
-              + "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\","
-              + "\"eventType\":\"applicationVnf\",\"priority\":\"Normal\",\"version\":3,"
-              + "\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312,\"domain\":\"heartbeat\","
-              + "\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\","
-              + "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}");
-
-  private static final String PARTITION = "dns01cmd004";
-
-  private DMaaPEventPublisher eventPublisher;
-  private CambriaBatchingPublisher cambriaPublisher;
-  private DMaaPPublishersCache DMaaPPublishersCache;
-
-  @Before
-  public void setUp() {
-    cambriaPublisher = mock(CambriaBatchingPublisher.class);
-    DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
-    when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
-    eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache);
-  }
-
-  @Test
-  public void shouldSendEventToTopic() throws Exception {
-    // when
-    eventPublisher.sendEvent(givenVesEventWithoutVESuniqueIdField(), STREAM_ID);
-
-    // then
-    verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
-  }
-
-  @Test
-  public void shouldRemoveInternalVESUIDBeforeSending() throws Exception {
-    // when
-    eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
-    // then
-    verify(cambriaPublisher).send(PARTITION, EXPECTED_EVENT.toString());
-  }
-
-  @Test
-  public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
-    // given
-    given(cambriaPublisher.send(anyString(), anyString()))
-        .willThrow(new IOException("Expected exception - test case scenario!"));
-
-    // when
-    eventPublisher.sendEvent(givenVesEventWithVESUniqueIdField(), STREAM_ID);
-
-    // then
-    verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
-  }
-
-  private VesEvent givenVesEventWithVESUniqueIdField() {
-    return new VesEvent(
-        new JSONObject(
-            "{\"VESversion\":\"v7\",\"VESuniqueId\":\"fd69d432-5cd5-4c15-9d34-407c81c61c6a-0\"," +
-                    "\"event\":{" +
-                    "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
-                    "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
-                    "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
-                    "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
-                    "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
-                    "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
-                    "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
-  }
-
-  private VesEvent givenVesEventWithoutVESuniqueIdField() {
-    return new VesEvent(
-            new JSONObject(
-                    "{\"VESversion\":\"v7\"," +
-                            "\"event\":{" +
-                            "\"commonEventHeader\":{\"startEpochMicrosec\":1537562659253019," +
-                            "\"sourceId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\"," +
-                            "\"eventId\":\"Heartbeat_vDNS_100.100.10.10\",\"nfcNamingCode\":\"DNS\"," +
-                            "\"reportingEntityId\":\"79e90d76-513a-4f79-886d-470a0037c5cf\",\"eventType\":\"applicationVnf\"," +
-                            "\"priority\":\"Normal\",\"version\":3,\"reportingEntityName\":\"dns01cmd004\",\"sequence\":36312," +
-                            "\"domain\":\"heartbeat\",\"lastEpochMicrosec\":1537562659253019,\"eventName\":\"Heartbeat_vDNS\"," +
-                            "\"sourceName\":\"dns01cmd004\",\"nfNamingCode\":\"MDNS\"}}}"));
-  }
-}
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/common/publishing/DMaaPPublishersCacheTest.java
deleted file mode 100644 (file)
index f4dbe19..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * org.onap.dcaegen2.collectors.ves
- * ================================================================================
- * Copyright (C) 2018 Nokia. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.common.publishing;
-
-import static io.vavr.API.List;
-import static io.vavr.API.Map;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import com.att.nsa.cambria.client.CambriaBatchingPublisher;
-import io.vavr.collection.Map;
-import io.vavr.control.Option;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.CambriaPublishersCacheLoader;
-import org.onap.dcae.common.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
-
-
-public class DMaaPPublishersCacheTest {
-
-    private String streamId1;
-    private Map<String, PublisherConfig> dMaaPConfigs;
-
-    @Before
-    public void setUp() {
-        streamId1 = "sampleStream1";
-        dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
-    }
-
-    @Test
-    public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
-        // given
-        DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
-        // when
-        Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
-        Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
-
-        // then
-        assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
-    }
-
-    @Test
-    public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
-        // given
-        CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
-        CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
-        DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
-                                                                             new OnPublisherRemovalListener(),
-                                                                             dMaaPConfigs);
-        when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
-
-        // when
-        dMaaPPublishersCache.getPublisher(streamId1);
-        dMaaPPublishersCache.closePublisherFor(streamId1);
-
-        // then
-        verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
-
-    }
-
-    @Test
-    public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
-        // given
-        DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
-
-        // then
-        assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
-    }
-
-
-    @Test
-    public void shouldCloseOnlyChangedPublishers() throws IOException, InterruptedException {
-        // given
-        CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
-        CambriaBatchingPublisher cambriaPublisherMock2 = mock(CambriaBatchingPublisher.class);
-        CambriaPublishersCacheLoader cacheLoaderMock = mock(CambriaPublishersCacheLoader.class);
-        String firstDomain = "domain1";
-        String secondDomain = "domain2";
-        Map<String, PublisherConfig> oldConfig = Map(firstDomain,
-                                                     new PublisherConfig(List("destination1"), "topic1"),
-                                                     secondDomain,
-                                                     new PublisherConfig(List("destination2"), "topic2",
-                                                                         "user", "pass"));
-        Map<String, PublisherConfig> newConfig = Map(firstDomain, new PublisherConfig(List("destination1"), "topic1"),
-                                                     secondDomain, new PublisherConfig(List("destination2"), "topic2"));
-        DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
-                                                                             new OnPublisherRemovalListener(),
-                                                                             oldConfig);
-        when(cacheLoaderMock.load(firstDomain)).thenReturn(cambriaPublisherMock1);
-        when(cacheLoaderMock.load(secondDomain)).thenReturn(cambriaPublisherMock2);
-
-        dMaaPPublishersCache.getPublisher(firstDomain);
-        dMaaPPublishersCache.getPublisher(secondDomain);
-
-        // when
-        dMaaPPublishersCache.reconfigure(newConfig);
-
-        // then
-        verify(cambriaPublisherMock2).close(20, TimeUnit.SECONDS);
-        verifyZeroInteractions(cambriaPublisherMock1);
-    }
-}
\ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java b/src/test/java/org/onap/dcae/common/publishing/DMaapContainer.java
new file mode 100644 (file)
index 0000000..9ece10b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START====================================
+ * VES Collector
+ * =========================================================
+ * Copyright (C) 2019-2021 Nokia. All rights reserved.
+ * =========================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================
+ */
+
+package org.onap.dcae.common.publishing;
+
+import org.testcontainers.containers.DockerComposeContainer;
+
+import java.io.File;
+import java.net.URL;
+
+final class DMaapContainer {
+    private static final String MR_COMPOSE_RESOURCE_NAME = "dmaap-msg-router/message-router-compose.yml";
+    private static final String DOCKER_COMPOSE_FILE_PATH = getDockerComposeFilePath(MR_COMPOSE_RESOURCE_NAME);
+    static final int DMAAP_SERVICE_EXPOSED_PORT = 3904;
+    static final String DMAAP_SERVICE_NAME = "onap-dmaap";
+
+    private DMaapContainer() {}
+
+
+    public static DockerComposeContainer createContainerInstance(){
+        return new DockerComposeContainer(
+                new File(DOCKER_COMPOSE_FILE_PATH))
+                .withExposedService(DMAAP_SERVICE_NAME, DMAAP_SERVICE_EXPOSED_PORT)
+                .withLocalCompose(true);
+    }
+
+
+
+    private static String getDockerComposeFilePath(String resourceName) {
+        URL resource = DMaapContainer.class.getClassLoader()
+                .getResource(resourceName);
+
+        if (resource != null) return resource.getFile();
+        else throw new RuntimeException(String
+                .format("File %s does not exist", resourceName));
+    }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java b/src/test/java/org/onap/dcae/common/publishing/MessageRouterHttpStatusMapperTest.java
new file mode 100644 (file)
index 0000000..0e5ae90
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.onap.dcae.common.model.BackwardsCompatibilityException;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.http.HttpStatus;
+
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.onap.dcae.ApplicationSettings.responseCompatibility;
+import static org.onap.dcae.common.publishing.MessageRouterHttpStatusMapper.getHttpStatus;
+
+class MessageRouterHttpStatusMapperTest {
+
+    public static final String BACKWARDS_COMPATIBILITY = "v7.2";
+    public static final String BACKWARDS_COMPATIBILITY_NONE = "NONE";
+
+    @Test
+    void ves_shouldResponse202() {
+        //given
+        responseCompatibility = BACKWARDS_COMPATIBILITY;
+        MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+        when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+        //when
+        HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+        //then
+        assertSame(HttpStatus.ACCEPTED, httpStatusResponse);
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = HttpStatus.class,
+            names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+                    "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT","PAYLOAD_TOO_LARGE"}
+    )
+    void ves_shouldMapErrorsToBackwardsCompatibility(HttpStatus httpStatus) {
+        //given
+        responseCompatibility = BACKWARDS_COMPATIBILITY;
+        MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+        when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+        //when
+        //then
+        assertThrows(BackwardsCompatibilityException.class,()->getHttpStatus(messageRouterPublishResponse));
+    }
+
+    @Test
+    void ves_shouldResponse200WhenBackwardsCompatibilityIsNone() {
+        //given
+        responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+        MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+        when(messageRouterPublishResponse.successful()).thenReturn(true);
+
+        //when
+        HttpStatus httpStatusResponse = getHttpStatus(messageRouterPublishResponse);
+
+        //then
+        assertSame(HttpStatus.OK, httpStatusResponse);
+    }
+
+    @Test
+    void ves_shouldHandleError413WhenBackwardsCompatibilityIsNone() {
+        //given
+        responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+        MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+        when(messageRouterPublishResponse.failReason()).thenReturn(HttpStatus.PAYLOAD_TOO_LARGE.toString());
+
+        //when
+        //then
+        assertThrows(PayloadToLargeException.class,()->getHttpStatus(messageRouterPublishResponse));
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = HttpStatus.class,
+            names = {"NOT_FOUND", "REQUEST_TIMEOUT", "TOO_MANY_REQUESTS", "INTERNAL_SERVER_ERROR", "BAD_GATEWAY",
+                    "SERVICE_UNAVAILABLE", "GATEWAY_TIMEOUT"}
+    )
+    void ves_shouldMapErrorsTo503WhenBackwardsCompatibilityIsNone(HttpStatus httpStatus) {
+        //given
+        responseCompatibility = BACKWARDS_COMPATIBILITY_NONE;
+        MessageRouterPublishResponse messageRouterPublishResponse = mock(MessageRouterPublishResponse.class);
+        when(messageRouterPublishResponse.failReason()).thenReturn(httpStatus.toString());
+
+        //when
+        //then
+        assertThrows(InternalException.class,()->getHttpStatus(messageRouterPublishResponse));
+    }
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTest.java
new file mode 100644 (file)
index 0000000..f269b94
--- /dev/null
@@ -0,0 +1,78 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.testcontainers.containers.DockerComposeContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+
+import static org.onap.dcae.common.publishing.DMaapContainer.createContainerInstance;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+
+@Testcontainers
+public class PublisherTest {
+
+    @Container
+    private final DockerComposeContainer CONTAINER = createContainerInstance();
+
+    @Test
+    void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+        //given
+        final Publisher publisher = new Publisher();
+        final String simpleEvent = "{\"message\":\"message1\"}";
+        final List<String> twoJsonMessages = List.of(simpleEvent);
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(twoJsonMessages, createPublishConfig());
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(Duration.ofSeconds(10));
+    }
+
+
+    private Option<PublisherConfig> createPublishConfig() {
+        List<String> desc = List.of("127.0.0.1:3904");
+        PublisherConfig conf = new PublisherConfig(desc, "topic");
+        return Option.of(conf);
+    }
+
+    private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+        return ImmutableMessageRouterPublishResponse
+                .builder()
+                .items(items)
+                .build();
+    }
+
+}
diff --git a/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java b/src/test/java/org/onap/dcae/common/publishing/PublisherTestMockServer.java
new file mode 100644 (file)
index 0000000..dbecd53
--- /dev/null
@@ -0,0 +1,156 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+
+import com.google.gson.JsonElement;
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.junit.jupiter.MockServerExtension;
+import org.mockserver.junit.jupiter.MockServerSettings;
+import org.mockserver.matchers.Times;
+import org.mockserver.verify.VerificationTimes;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.createPublishRequest;
+import static org.onap.dcae.common.publishing.DmaapRequestConfiguration.getAsJsonElements;
+
+@ExtendWith(MockServerExtension.class)
+@MockServerSettings(ports = {1080, 8888})
+class PublisherTestMockServer {
+
+    private static final int MAX_IDLE_TIME = 10;
+    private static final int MAX_LIFE_TIME = 20;
+    private static final int CONNECTION_POOL = 1;
+    private static final String TOPIC = "TOPIC10";
+    private static final String PATH = String.format("/events/%s/", TOPIC);
+
+    private static final String TIMEOUT_ERROR_MESSAGE = "408 Request Timeout\n"
+            + "{"
+            + "\"requestError\":"
+            + "{"
+            + "\"serviceException\":"
+            + "{"
+            + "\"messageId\":\"SVC0001\","
+            + "\"text\":\"Client timeout exception occurred, Error code is %1\","
+            + "\"variables\":[\"408\"]"
+            + "}"
+            + "}"
+            + "}";
+
+    private final ClientAndServer client;
+
+    public PublisherTestMockServer(ClientAndServer client) {
+        this.client = client;
+    }
+
+    @Test
+    void publisher_shouldHandleClientTimeoutErrorWhenTimeoutDefined() {
+        //given
+        final Long timeoutSec = 1L;
+        final Publisher publisher = new Publisher(connectionPoolConfiguration());
+        final String simpleEvent = "{\"message\":\"message1\"}";
+        final MessageRouterPublishResponse expectedResponse = errorPublishResponse(TIMEOUT_ERROR_MESSAGE);
+
+        final String path = String.format("/events/%s/", TOPIC);
+        client.when(request().withPath(path), Times.once())
+               .respond(response().withDelay(TimeUnit.SECONDS, 2));
+        List<String> events = List.of(simpleEvent);
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(events, createPublishRequest(createPublishConfig(), timeoutSec));
+
+
+
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(Duration.ofSeconds(10));
+
+        //then
+        client.verify(request().withPath(path), VerificationTimes.exactly(1));
+
+    }
+
+    @Test
+    void publishEvent_shouldSuccessfullyPublishSingleMessage() {
+        //given
+        final Publisher publisher = new Publisher();
+        final String simpleEvent = "{\"message\":\"message1\"}";
+        final List<String> twoJsonMessages = List.of(simpleEvent);
+        final MessageRouterPublishResponse expectedResponse = successPublishResponse(getAsJsonElements(twoJsonMessages));
+        client.when(request().withPath(PATH), Times.once())
+                .respond(response());
+
+        //when
+        final Flux<MessageRouterPublishResponse> result = publisher.publishEvents(List.of(simpleEvent), createPublishConfig());
+
+        //then
+        StepVerifier.create(result)
+                .expectNext(expectedResponse)
+                .expectComplete()
+                .verify(Duration.ofSeconds(10));
+    }
+
+    private Option<PublisherConfig> createPublishConfig() {
+        List<String> desc = List.of("localhost:1080");
+        PublisherConfig conf = new PublisherConfig(desc, TOPIC);
+        return Option.of(conf);
+    }
+
+    private MessageRouterPublishResponse successPublishResponse(List<JsonElement> items) {
+        return ImmutableMessageRouterPublishResponse
+                .builder()
+                .items(items)
+                .build();
+    }
+
+    public static MessageRouterPublishResponse errorPublishResponse(String failReasonFormat, Object... formatArgs) {
+        String failReason = formatArgs.length == 0 ? failReasonFormat : String.format(failReasonFormat, formatArgs);
+        return ImmutableMessageRouterPublishResponse
+                .builder()
+                .failReason(failReason)
+                .build();
+    }
+
+    public MessageRouterPublisherConfig connectionPoolConfiguration() {
+        return ImmutableMessageRouterPublisherConfig.builder()
+                .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+                        .connectionPool(CONNECTION_POOL)
+                        .maxIdleTime(MAX_IDLE_TIME)
+                        .maxLifeTime(MAX_LIFE_TIME)
+                        .build())
+                .build();
+    }
+}
diff --git a/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java b/src/test/java/org/onap/dcae/common/validator/BatchEventValidatorTest.java
new file mode 100644 (file)
index 0000000..05baa04
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.validator;
+
+import org.json.JSONObject;
+import org.junit.jupiter.api.Test;
+import org.onap.dcae.ApplicationSettings;
+import org.onap.dcae.common.EventUpdater;
+import org.onap.dcae.common.model.VesEvent;
+import org.onap.dcae.restapi.EventValidatorException;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.onap.dcae.common.validator.BatchEventValidator.executeBatchEventValidation;
+
+class BatchEventValidatorTest {
+
+    private final ApplicationSettings settings = mock(ApplicationSettings.class);
+    private final EventUpdater eventUpdater = new EventUpdater(settings);
+    private static final String EVENT = "event";
+    private static final String EVENT_LIST = "eventList";
+
+    @Test
+    void shouldThrowException_whenDomainFieldsHaveDifferentValues() throws IOException {
+        //given
+        final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid_two_different_domain.json", EVENT_LIST);
+
+        //when
+        //then
+        assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+    }
+
+    @Test
+    void shouldNotThrowException_whenDomainFieldsHaveSameValues() throws IOException {
+        //given
+        final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_valid.json", EVENT_LIST);
+
+        //when
+        //then
+        assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+    }
+
+    @Test
+    void shouldThrowException_whenStndDefinedNamespaceFieldsHaveDifferentValuesAndDomainsAreStndDefined() throws IOException {
+        //given
+        final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json", EVENT_LIST);
+
+        //when
+        //then
+        assertThrows(EventValidatorException.class, () -> executeBatchEventValidation(eventList));
+    }
+
+    @Test
+    void shouldNotThrowException_whenStndDefinedNamespaceFieldsHaveSameValuesAndDomainsAreStndDefined() throws IOException {
+        //given
+        final List<VesEvent> eventList = prepareEventList("src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json", EVENT_LIST);
+
+        //when
+        //then
+        assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+    }
+
+    @Test
+    void shouldNotThrowException_whenSendValidNotBatchEvent() throws IOException {
+        //given
+        final List<VesEvent> eventList = prepareEventList("src/test/resources/ves_stdnDefined_valid.json", EVENT);
+
+        //when
+        //then
+        assertDoesNotThrow(() -> executeBatchEventValidation(eventList));
+    }
+
+    private List<VesEvent> prepareEventList(String pathToFile, String eventType) throws IOException {
+        final VesEvent vesEventFromJson = createVesEventFromJson(pathToFile);
+        return eventUpdater.convert(vesEventFromJson, "v7", UUID.randomUUID(), eventType);
+    }
+
+    private VesEvent createVesEventFromJson(String pathToFile) throws IOException {
+        Path path = Paths.get(pathToFile);
+        final List<String> lines = Files.readAllLines(path);
+        String str = String.join("", lines);
+        return new VesEvent(new JSONObject(str));
+    }
+
+}
diff --git a/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java b/src/test/java/org/onap/dcae/multiplestreamreducer/MultipleStreamReducerTest.java
new file mode 100644 (file)
index 0000000..d085eb1
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * VES Collector
+ * ================================================================================
+ * Copyright (C) 2021 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.multiplestreamreducer;
+
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class MultipleStreamReducerTest {
+
+    private final MultipleStreamReducer multipleStreamReducer = new MultipleStreamReducer();
+    private final Map<String, String[]> domainToStreams = HashMap.of(
+            "fault", new String[]{"ves-fault", "stream1", "stream2"},
+            "log", new String[]{"ves-syslog", "stream3", "stream4", "stream5"},
+            "test", new String[]{"stream6"}
+    );
+
+    @Test
+    void shouldReduceStreamsToTheFirstOne() {
+        //given
+        Map<String, String> expected = HashMap.of(
+                "fault", "ves-fault",
+                "log", "ves-syslog",
+                "test", "stream6"
+        );
+
+        //when
+        final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams);
+
+        //then
+        assertEquals(expected, domainToStreamsAfterReduce);
+    }
+
+    @Test
+    void shouldReturnInfoAboutDomainToStreamsConfig() {
+        //given
+        final Map<String, String> domainToStreamsAfterReduce = multipleStreamReducer.reduce(domainToStreams);
+        String expectedRedundantStreamsInfo =
+                "Domain: fault has active stream: ves-fault\n" +
+                "Domain: log has active stream: ves-syslog\n" +
+                "Domain: test has active stream: stream6\n";
+
+        //when
+        final String domainToStreamsConfigInfo = multipleStreamReducer.getDomainToStreamsInfo(domainToStreamsAfterReduce);
+
+        //then
+        assertEquals(expectedRedundantStreamsInfo, domainToStreamsConfigInfo);
+    }
+
+}
index 9df0c69..931e7bc 100644 (file)
@@ -143,4 +143,4 @@ public class ApiAuthInterceptionTest {
     healthcheckRequest.setServerPort(serverPort);
     return healthcheckRequest;
   }
-}
\ No newline at end of file
+}
index a3c0628..9b43687 100644 (file)
@@ -1,8 +1,8 @@
 /*
  * ============LICENSE_START=======================================================
- * PROJECT
+ * VES Collector
  * ================================================================================
- * Copyright (C) 2020 Nokia. All rights reserved.s
+ * Copyright (C) 2020-2021 Nokia. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,24 +25,28 @@ import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import com.networknt.schema.JsonSchema;
 import io.vavr.collection.HashMap;
-import org.apache.http.HttpStatus;
 import org.jetbrains.annotations.NotNull;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
 import org.onap.dcae.ApplicationSettings;
 import org.onap.dcae.JSonSchemasSupplier;
 import org.onap.dcae.common.EventSender;
 import org.onap.dcae.common.EventTransformation;
 import org.onap.dcae.common.HeaderUtils;
 import org.onap.dcae.common.JsonDataLoader;
-import org.onap.dcae.common.model.VesEvent;
-import org.onap.dcae.common.validator.StndDefinedDataValidator;
+import org.onap.dcae.common.model.InternalException;
+import org.onap.dcae.common.model.PayloadToLargeException;
 import org.onap.dcae.common.publishing.DMaaPEventPublisher;
+import org.onap.dcae.common.validator.StndDefinedDataValidator;
 import org.slf4j.Logger;
+import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.mock.web.MockHttpServletRequest;
 import org.springframework.web.context.request.RequestContextHolder;
@@ -53,8 +57,10 @@ import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
@@ -63,14 +69,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-@RunWith(MockitoJUnitRunner.class)
+
+@ExtendWith(MockitoExtension.class)
 public class VesRestControllerTest {
 
     private static final String EVENT_TRANSFORM_FILE_PATH = "/eventTransform.json";
-    private static final String ACCEPTED = "Accepted";
+    private static final String ACCEPTED = "Successfully send event";
     private static final String VERSION_V7 = "v7";
-    public static final String VES_FAULT_TOPIC = "ves-fault";
-    public static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision";
+    static final String VES_FAULT_TOPIC = "ves-fault";
+    static final String VES_3_GPP_FAULT_SUPERVISION_TOPIC = "ves-3gpp-fault-supervision";
 
     private VesRestController vesRestController;
 
@@ -92,18 +99,18 @@ public class VesRestControllerTest {
     @Mock
     private StndDefinedDataValidator stndDefinedDataValidator;
 
-    @Before
-    public void setUp(){
-        final HashMap<String, String[]> streamIds = HashMap.of(
-                "fault", new String[]{VES_FAULT_TOPIC},
-                "3GPP-FaultSupervision", new String[]{VES_3_GPP_FAULT_SUPERVISION_TOPIC}
+    @BeforeEach
+    void setUp(){
+        final HashMap<String, String> streamIds = HashMap.of(
+                "fault", VES_FAULT_TOPIC,
+                "3GPP-FaultSupervision", VES_3_GPP_FAULT_SUPERVISION_TOPIC
         );
         this.vesRestController = new VesRestController(applicationSettings, logger,
                 errorLogger, new EventSender(eventPublisher, streamIds), headerUtils, stndDefinedDataValidator);
     }
 
     @Test
-    public void shouldReportThatApiVersionIsNotSupported() {
+    void shouldReportThatApiVersionIsNotSupported() {
         // given
         when(applicationSettings.isVersionSupported("v20")).thenReturn(false);
         MockHttpServletRequest request = givenMockHttpServletRequest();
@@ -112,33 +119,33 @@ public class VesRestControllerTest {
         final ResponseEntity<String> event = vesRestController.event("", "v20", request);
 
         // then
-        assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+        assertThat(event.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
         assertThat(event.getBody()).isEqualTo("API version v20 is not supported");
         verifyThatEventWasNotSend();
     }
 
     @Test
-    public void shouldTransformEventAccordingToEventTransformFile() throws IOException {
+    void shouldTransformEventAccordingToEventTransformFile() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
 
         MockHttpServletRequest request = givenMockHttpServletRequest();
-
         String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+        when(eventPublisher.sendEvent(any(), any())).thenReturn((HttpStatus.OK));
 
         //when
         final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
         assertThat(response.getBody()).isEqualTo(ACCEPTED);
         verifyThatTransformedEventWasSend(eventPublisher, validEvent);
     }
 
 
     @Test
-    public void shouldSendBatchOfEvents() throws IOException {
+    void shouldSendBatchEvent() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -146,18 +153,18 @@ public class VesRestControllerTest {
         MockHttpServletRequest request = givenMockHttpServletRequest();
 
         String validEvent = JsonDataLoader.loadContent("/ves7_batch_valid.json");
-
+        when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
         //when
         final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
         assertThat(response.getBody()).isEqualTo(ACCEPTED);
-        verify(eventPublisher, times(2)).sendEvent(any(),any());
+        verify(eventPublisher, times(1)).sendEvent(any(),any());
     }
 
     @Test
-    public void shouldSendStndDomainEventIntoDomainStream() throws IOException {
+    void shouldSendStndDomainEventIntoDomainStream() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -166,19 +173,20 @@ public class VesRestControllerTest {
         configureSchemasSupplierForStndDefineEvent();
 
         String validEvent = JsonDataLoader.loadContent("/ves_stdnDefined_valid.json");
+        when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
 
         //when
         final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
         assertThat(response.getBody()).isEqualTo(ACCEPTED);
         verify(eventPublisher).sendEvent(any(),eq(VES_3_GPP_FAULT_SUPERVISION_TOPIC));
     }
 
 
     @Test
-    public void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException {
+    void shouldReportThatStndDomainEventHasntGotNamespaceParameter() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -192,7 +200,7 @@ public class VesRestControllerTest {
         final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
         verifyErrorResponse(
                 response,
                 "SVC2006",
@@ -203,7 +211,7 @@ public class VesRestControllerTest {
     }
 
     @Test
-    public void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException {
+    void shouldReportThatStndDomainEventNamespaceParameterIsEmpty() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -217,7 +225,7 @@ public class VesRestControllerTest {
         final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_BAD_REQUEST);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
         verifyErrorResponse(
                 response,
                 "SVC2006",
@@ -228,7 +236,7 @@ public class VesRestControllerTest {
     }
 
     @Test
-    public void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException {
+    void shouldNotSendStndDomainEventWhenTopicCannotBeFoundInConfiguration() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -240,13 +248,12 @@ public class VesRestControllerTest {
         final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
-        assertThat(response.getBody()).isEqualTo(ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.BAD_REQUEST.value());
         verifyThatEventWasNotSend();
     }
 
     @Test
-    public void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException {
+    void shouldExecuteStndDefinedValidationWhenFlagIsOnTrue() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -254,18 +261,18 @@ public class VesRestControllerTest {
         MockHttpServletRequest request = givenMockHttpServletRequest();
         String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json");
         when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(true);
-
+        when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
         //when
         final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
         assertThat(response.getBody()).isEqualTo(ACCEPTED);
         verify(stndDefinedDataValidator, times(2)).validate(any());
     }
 
     @Test
-    public void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException {
+    void shouldNotExecuteStndDefinedValidationWhenFlagIsOnFalse() throws IOException {
         //given
         configureEventTransformations();
         configureHeadersForEventListener();
@@ -273,16 +280,76 @@ public class VesRestControllerTest {
         MockHttpServletRequest request = givenMockHttpServletRequest();
         String validEvent = JsonDataLoader.loadContent("/ves7_batch_with_stndDefined_valid.json");
         when(applicationSettings.getExternalSchemaValidationCheckflag()).thenReturn(false);
+        when(eventPublisher.sendEvent(any(), any())).thenReturn(HttpStatus.OK);
 
         //when
         final ResponseEntity<String> response = vesRestController.events(validEvent, VERSION_V7, request);
 
         //then
-        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SC_ACCEPTED);
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.OK.value());
         assertThat(response.getBody()).isEqualTo(ACCEPTED);
         verify(stndDefinedDataValidator, times(0)).validate(any());
     }
 
+    @Test
+    void shouldReturn413WhenPayloadIsTooLarge() throws IOException {
+        //given
+        configureEventTransformations();
+        configureHeadersForEventListener();
+
+        MockHttpServletRequest request = givenMockHttpServletRequest();
+        when(eventPublisher.sendEvent(any(), any())).thenThrow(new PayloadToLargeException());
+        String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+
+        //when
+        final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
+
+        //then
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.PAYLOAD_TOO_LARGE.value());
+        verifyErrorResponse(
+                response,
+                "SVC2000",
+                "The following service error occurred: %1. Error code is %2",
+                List.of("Request Entity Too Large","413")
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("errorsCodeAndResponseBody")
+    void shouldMapErrorTo503AndReturnOriginalBody(ApiException apiException,String bodyVariable,String bodyVariable2) throws IOException {
+        //given
+        configureEventTransformations();
+        configureHeadersForEventListener();
+
+        MockHttpServletRequest request = givenMockHttpServletRequest();
+        when(eventPublisher.sendEvent(any(), any())).thenThrow(new InternalException(apiException));
+        String validEvent = JsonDataLoader.loadContent("/ves7_valid_30_1_1_event.json");
+
+        //when
+        final ResponseEntity<String> response = vesRestController.event(validEvent, VERSION_V7, request);
+
+        //then
+        assertThat(response.getStatusCodeValue()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE.value());
+        verifyErrorResponse(
+                response,
+                "SVC2000",
+                "The following service error occurred: %1. Error code is %2",
+                List.of(bodyVariable,bodyVariable2)
+        );
+    }
+
+    private static Stream<Arguments> errorsCodeAndResponseBody() {
+        return Stream.of(
+                arguments(ApiException.NOT_FOUND, "Not Found","404"),
+                arguments(ApiException.REQUEST_TIMEOUT, "Request Timeout","408"),
+                arguments(ApiException.TOO_MANY_REQUESTS, "Too Many Requests","429"),
+                arguments(ApiException.INTERNAL_SERVER_ERROR, "Internal Server Error","500"),
+                arguments(ApiException.BAD_GATEWAY, "Bad Gateway","502"),
+                arguments(ApiException.SERVICE_UNAVAILABLE, "Service Unavailable","503"),
+                arguments(ApiException.GATEWAY_TIMEOUT, "Gateway Timeout","504")
+        );
+    }
+
     private void verifyThatEventWasNotSend() {
         verify(eventPublisher, never()).sendEvent(any(), any());
     }
@@ -313,7 +380,7 @@ public class VesRestControllerTest {
         final List<EventTransformation> eventTransformations = loadEventTransformations();
         when(applicationSettings.isVersionSupported(VERSION_V7)).thenReturn(true);
         when(applicationSettings.eventTransformingEnabled()).thenReturn(true);
-        when(applicationSettings.getEventTransformations()).thenReturn(eventTransformations);
+        when(applicationSettings.getEventTransformations()).thenReturn((eventTransformations));
     }
 
     private void configureHeadersForEventListener() {
@@ -326,11 +393,11 @@ public class VesRestControllerTest {
         assertThat(eventBeforeTransformation).contains("\"version\": \"4.0.1\"");
         assertThat(eventBeforeTransformation).contains("\"faultFieldsVersion\": \"4.0\"");
 
-        ArgumentCaptor<VesEvent> argument = ArgumentCaptor.forClass(VesEvent.class);
+        ArgumentCaptor<List> argument = ArgumentCaptor.forClass(List.class);
         ArgumentCaptor<String> domain = ArgumentCaptor.forClass(String.class);
         verify(eventPublisher).sendEvent(argument.capture(), domain.capture());
 
-        final String transformedEvent = argument.getValue().asJsonObject().toString();
+        final String transformedEvent = argument.getValue().toString();
         final String eventSentAtTopic = domain.getValue();
 
         // event after transformation
diff --git a/src/test/java/org/onap/dcae/vestest/TestVESLogger.java b/src/test/java/org/onap/dcae/vestest/TestVESLogger.java
deleted file mode 100644 (file)
index 1689263..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.vestest;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNotSame;
-import static org.onap.dcae.common.VESLogger.REQUEST_ID;
-
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import java.util.UUID;
-import org.junit.Test;
-import org.onap.dcae.common.VESLogger;
-
-public class TestVESLogger {
-
-    @Test
-    public void shouldOnLoggingContextInitializationPutRandomUuidAsRequestId() {
-        LoggingContext commonLoggingContext = VESLogger.getCommonLoggingContext();
-        String requestId = commonLoggingContext.get(REQUEST_ID, "default");
-
-        assertNotNull(requestId);
-        assertNotSame(requestId, "default");
-
-    }
-
-    @Test
-    public void shouldOnLoggingContextInitializationPutGivenUuuidAsRequestIdAndSupplyEndTimestamp() {
-        final UUID uuid = UUID.randomUUID();
-        LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid);
-        String requestId = loggingContextForThread.get(REQUEST_ID, "default");
-        String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default");
-
-        assertNotNull(requestId);
-        assertNotNull(endTimestamp);
-        assertNotSame(endTimestamp, "default");
-        assertEquals(requestId, uuid.toString());
-    }
-
-    @Test
-    public void shouldOnLoggingContextInitializationPutGivenUuidAsRequestIdAndSupplyEndTimestampAndCompleteStatusCode() {
-        final UUID uuid = UUID.randomUUID();
-        LoggingContext loggingContextForThread = VESLogger.getLoggingContextForThread(uuid.toString());
-        String requestId = loggingContextForThread.get(REQUEST_ID, "default");
-        String statusCode = loggingContextForThread.get("statusCode", "default");
-        String endTimestamp = loggingContextForThread.get(EcompFields.kEndTimestamp, "default");
-
-        assertNotNull(requestId);
-        assertNotNull(endTimestamp);
-        assertNotNull(statusCode);
-        assertNotSame(endTimestamp, "default");
-        assertEquals(requestId, uuid.toString());
-        assertEquals(statusCode, "COMPLETE");
-    }
-
-}
-
diff --git a/src/test/resources/dmaap-msg-router/MsgRtrApi.properties b/src/test/resources/dmaap-msg-router/MsgRtrApi.properties
new file mode 100644 (file)
index 0000000..d288bd2
--- /dev/null
@@ -0,0 +1,155 @@
+# LICENSE_START=======================================================
+#  org.onap.dmaap
+#  ================================================================================
+#  Copyright Â© 2017 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+#
+#  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+###############################################################################
+##
+## Cambria API Server config
+##
+## Default values are shown as commented settings.
+##
+###############################################################################
+##
+## HTTP service
+##
+## 3904 is standard as of 7/29/14.
+#
+## Zookeeper Connection
+##
+## Both Cambria and Kafka make use of Zookeeper.
+##
+#config.zk.servers=172.18.1.1
+#config.zk.servers={{.Values.zookeeper.name}}:{{.Values.zookeeper.port}}
+config.zk.servers=zookeeper
+#config.zk.root=/fe3c/cambria/config
+###############################################################################
+##
+## Kafka Connection
+##
+##        Items below are passed through to Kafka's producer and consumer
+##        configurations (after removing "kafka.")
+##        if you want to change request.required.acks it can take this one value
+#kafka.metadata.broker.list=localhost:9092,localhost:9093
+#kafka.metadata.broker.list={{.Values.kafka.name}}:{{.Values.kafka.port}}
+kafka.metadata.broker.list=kafka:9092
+##kafka.request.required.acks=-1
+#kafka.client.zookeeper=${config.zk.servers}
+consumer.timeout.ms=100
+zookeeper.connection.timeout.ms=6000
+zookeeper.session.timeout.ms=20000
+zookeeper.sync.time.ms=2000
+auto.commit.interval.ms=1000
+fetch.message.max.bytes=1000000
+auto.commit.enable=false
+#(backoff*retries > zksessiontimeout)
+kafka.rebalance.backoff.ms=10000
+kafka.rebalance.max.retries=6
+###############################################################################
+##
+##        Secured Config
+##
+##        Some data stored in the config system is sensitive -- API keys and secrets,
+##        for example. to protect it, we use an encryption layer for this section
+##        of the config.
+##
+## The key is a base64 encode AES key. This must be created/configured for
+## each installation.
+#cambria.secureConfig.key=
+##
+## The initialization vector is a 16 byte value specific to the secured store.
+## This must be created/configured for each installation.
+#cambria.secureConfig.iv=
+## Southfield Sandbox
+cambria.secureConfig.key=b/7ouTn9FfEw2PQwL0ov/Q==
+cambria.secureConfig.iv=wR9xP5k5vbz/xD0LmtqQLw==
+authentication.adminSecret=fe3cCompound
+#cambria.secureConfig.key[pc569h]=YT3XPyxEmKCTLI2NK+Sjbw==
+#cambria.secureConfig.iv[pc569h]=rMm2jhR3yVnU+u2V9Ugu3Q==
+###############################################################################
+##
+## Consumer Caching
+##
+##        Kafka expects live connections from the consumer to the broker, which
+##        obviously doesn't work over connectionless HTTP requests. The Cambria
+##        server proxies HTTP requests into Kafka consumer sessions that are kept
+##        around for later re-use. Not doing so is costly for setup per request,
+##        which would substantially impact a high volume consumer's performance.
+##
+##        This complicates Cambria server failover, because we often need server
+##        A to close its connection before server B brings up the replacement.
+##
+## The consumer cache is normally enabled.
+#cambria.consumer.cache.enabled=true
+## Cached consumers are cleaned up after a period of disuse. The server inspects
+## consumers every sweepFreqSeconds and will clean up any connections that are
+## dormant for touchFreqMs.
+#cambria.consumer.cache.sweepFreqSeconds=15
+cambria.consumer.cache.touchFreqMs=120000
+##stickforallconsumerrequests=false
+## The cache is managed through ZK. The default value for the ZK connection
+## string is the same as config.zk.servers.
+#cambria.consumer.cache.zkConnect=${config.zk.servers}
+
+##
+## Shared cache information is associated with this node's name. The default
+## name is the hostname plus the HTTP service port this host runs on. (The
+## hostname is determined via InetAddress.getLocalHost ().getCanonicalHostName(),
+## which is not always adequate.) You can set this value explicitly here.
+##
+#cambria.api.node.identifier=<use-something-unique-to-this-instance>
+
+#cambria.rateLimit.maxEmptyPollsPerMinute=30
+#cambria.rateLimitActual.delay.ms=10
+###############################################################################
+##
+## Metrics Reporting
+##
+##        This server can report its metrics periodically on a topic.
+##
+#metrics.send.cambria.enabled=true
+#metrics.send.cambria.topic=cambria.apinode.metrics                                  #msgrtr.apinode.metrics.dmaap
+#metrics.send.cambria.sendEverySeconds=60
+cambria.consumer.cache.zkBasePath=/fe3c/cambria/consumerCache
+consumer.timeout=17
+default.partitions=3
+default.replicas=3
+##############################################################################
+#100mb
+maxcontentlength=10000
+##############################################################################
+#AAF Properties
+msgRtr.namespace.aaf=org.onap.dmaap.mr.topic
+msgRtr.topicfactory.aaf=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
+enforced.topic.name.AAF=org.onap.dmaap.mr
+forceAAF=false
+transidUEBtopicreqd=false
+defaultNSforUEB=org.onap.dmaap.mr
+##############################################################################
+#Mirror Maker Agent
+msgRtr.mirrormakeradmin.aaf=org.onap.dmaap.mr.mirrormaker|*|admin
+msgRtr.mirrormakeruser.aaf=org.onap.dmaap.mr.mirrormaker|*|user
+msgRtr.mirrormakeruser.aaf.create=org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:
+msgRtr.mirrormaker.timeout=15000
+msgRtr.mirrormaker.topic=org.onap.dmaap.mr.mirrormakeragent
+msgRtr.mirrormaker.consumergroup=mmagentserver
+msgRtr.mirrormaker.consumerid=1
+kafka.max.poll.interval.ms=300000
+kafka.heartbeat.interval.ms=60000
+kafka.session.timeout.ms=240000
+kafka.max.poll.records=1000
diff --git a/src/test/resources/dmaap-msg-router/cadi.properties b/src/test/resources/dmaap-msg-router/cadi.properties
new file mode 100644 (file)
index 0000000..f2a3cdc
--- /dev/null
@@ -0,0 +1,18 @@
+aaf_url=https://AAF_LOCATE_URL/onap.org.osaaf.aaf.service:2.1
+aaf_env=DEV
+aaf_lur=org.onap.aaf.cadi.aaf.v2_0.AAFLurPerm
+
+cadi_truststore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.trust.jks
+cadi_truststore_password=8FyfX+ar;0$uZQ0h9*oXchNX
+
+cadi_keyfile=/appl/dmaapMR1/etc/org.onap.dmaap.mr.keyfile
+
+cadi_alias=dmaapmr@mr.dmaap.onap.org
+cadi_keystore=/appl/dmaapMR1/etc/org.onap.dmaap.mr.p12
+cadi_keystore_password=GDQttV7)BlOvWMf6F7tz&cjy
+cadi_x509_issuers=CN=intermediateCA_1, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_7, OU=OSAAF, O=ONAP, C=US:CN=intermediateCA_9, OU=OSAAF, O=ONAP, C=US
+
+cadi_loglevel=INFO
+cadi_protocols=TLSv1.1,TLSv1.2
+cadi_latitude=37.78187
+cadi_longitude=-122.26147
diff --git a/src/test/resources/dmaap-msg-router/logback.xml b/src/test/resources/dmaap-msg-router/logback.xml
new file mode 100644 (file)
index 0000000..a39d9e4
--- /dev/null
@@ -0,0 +1,207 @@
+<!--
+     ============LICENSE_START=======================================================
+     Copyright Â© 2019 AT&T Intellectual Property. All rights reserved.
+     ================================================================================
+     Licensed under the Apache License, Version 2.0 (the "License");
+     you may not use this file except in compliance with the License.
+     You may obtain a copy of the License at
+           http://www.apache.org/licenses/LICENSE-2.0
+
+     Unless required by applicable law or agreed to in writing, software
+     distributed under the License is distributed on an "AS IS" BASIS,
+     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     See the License for the specific language governing permissions and
+     limitations under the License.
+     ============LICENSE_END=========================================================
+ -->
+
+<configuration scan="true" scanPeriod="3 seconds" debug="false">
+  <contextName>${module.ajsc.namespace.name}</contextName>
+  <jmxConfigurator/>
+  <property name="logDirectory" value="${AJSC_HOME}/log"/>
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.LevelFilter">
+      <level>ERROR</level>
+      <onMatch>ACCEPT</onMatch>
+      <onMismatch>DENY</onMismatch>
+    </filter>
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{1024} - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+  <appender name="INFO" class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.LevelFilter">
+      <level>INFO</level>
+      <onMatch>ACCEPT</onMatch>
+      <onMismatch>DENY</onMismatch>
+    </filter>
+  </appender>
+
+  <appender name="DEBUG" class="ch.qos.logback.core.ConsoleAppender">
+
+    <encoder>
+      <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+    </encoder>
+  </appender>
+
+  <appender name="ERROR" class="ch.qos.logback.core.ConsoleAppender">class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.LevelFilter">
+      <level>ERROR</level>
+      <onMatch>ACCEPT</onMatch>
+      <onMismatch>DENY</onMismatch>
+    </filter>
+    <encoder>
+      <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+    </encoder>
+  </appender>
+
+
+  <!-- Msgrtr related loggers -->
+  <logger name="org.onap.dmaap.dmf.mr.service" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.service.impl" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.resources" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.resources.streamReaders" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.backends" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.backends.kafka" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.backends.memory" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.beans" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.constants" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.exception" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.listener" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.metrics.publisher" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.metrics.publisher.impl" level="INFO"/>
+
+
+  <logger name="org.onap.dmaap.dmf.mr.security" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.security.impl" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.transaction" level="INFO"/>
+  <logger name="com.att.dmf.mr.transaction.impl" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+  <logger name="org.onap.dmaap.dmf.mr.metabroker" level="INFO"/>
+
+  <logger name="org.onap.dmaap.dmf.mr.utils" level="INFO"/>
+  <logger name="org.onap.dmaap.mr.filter" level="INFO"/>
+
+  <!--<logger name="com.att.nsa.cambria.*" level="INFO" />-->
+
+  <!-- Msgrtr loggers in ajsc -->
+  <logger name="org.onap.dmaap.service" level="INFO"/>
+  <logger name="org.onap.dmaap" level="INFO"/>
+
+
+  <!-- Spring related loggers -->
+  <logger name="org.springframework" level="WARN" additivity="false"/>
+  <logger name="org.springframework.beans" level="WARN" additivity="false"/>
+  <logger name="org.springframework.web" level="WARN" additivity="false"/>
+  <logger name="com.blog.spring.jms" level="WARN" additivity="false"/>
+
+  <!-- AJSC Services (bootstrap services) -->
+  <logger name="ajsc" level="WARN" additivity="false"/>
+  <logger name="ajsc.RouteMgmtService" level="INFO" additivity="false"/>
+  <logger name="ajsc.ComputeService" level="INFO" additivity="false"/>
+  <logger name="ajsc.VandelayService" level="WARN" additivity="false"/>
+  <logger name="ajsc.FilePersistenceService" level="WARN" additivity="false"/>
+  <logger name="ajsc.UserDefinedJarService" level="WARN" additivity="false"/>
+  <logger name="ajsc.UserDefinedBeansDefService" level="WARN" additivity="false"/>
+  <logger name="ajsc.LoggingConfigurationService" level="WARN" additivity="false"/>
+
+  <!-- AJSC related loggers (DME2 Registration, csi logging, restlet, servlet
+    logging) -->
+  <logger name="ajsc.utils" level="WARN" additivity="false"/>
+  <logger name="ajsc.utils.DME2Helper" level="INFO" additivity="false"/>
+  <logger name="ajsc.filters" level="DEBUG" additivity="false"/>
+  <logger name="ajsc.beans.interceptors" level="DEBUG" additivity="false"/>
+  <logger name="ajsc.restlet" level="DEBUG" additivity="false"/>
+  <logger name="ajsc.servlet" level="DEBUG" additivity="false"/>
+  <logger name="com.att" level="WARN" additivity="false"/>
+  <logger name="com.att.ajsc.csi.logging" level="WARN" additivity="false"/>
+  <logger name="com.att.ajsc.filemonitor" level="WARN" additivity="false"/>
+
+  <logger name="com.att.nsa.dmaap.util" level="INFO" additivity="false"/>
+  <logger name="com.att.cadi.filter" level="INFO" additivity="false"/>
+
+
+  <!-- Other Loggers that may help troubleshoot -->
+  <logger name="net.sf" level="WARN" additivity="false"/>
+  <logger name="org.apache.commons.httpclient" level="WARN" additivity="false"/>
+  <logger name="org.apache.commons" level="WARN" additivity="false"/>
+  <logger name="org.apache.coyote" level="WARN" additivity="false"/>
+  <logger name="org.apache.jasper" level="WARN" additivity="false"/>
+
+  <!-- Camel Related Loggers (including restlet/servlet/jaxrs/cxf logging.
+    May aid in troubleshooting) -->
+  <logger name="org.apache.camel" level="WARN" additivity="false"/>
+  <logger name="org.apache.cxf" level="WARN" additivity="false"/>
+  <logger name="org.apache.camel.processor.interceptor" level="WARN" additivity="false"/>
+  <logger name="org.apache.cxf.jaxrs.interceptor" level="WARN" additivity="false"/>
+  <logger name="org.apache.cxf.service" level="WARN" additivity="false"/>
+  <logger name="org.restlet" level="DEBUG" additivity="false"/>
+  <logger name="org.apache.camel.component.restlet" level="DEBUG" additivity="false"/>
+  <logger name="org.apache.kafka" level="DEBUG" additivity="false"/>
+  <logger name="org.apache.zookeeper" level="INFO" additivity="false"/>
+  <logger name="org.I0Itec.zkclient" level="DEBUG" additivity="false"/>
+
+  <!-- logback internals logging -->
+  <logger name="ch.qos.logback.classic" level="INFO" additivity="false"/>
+  <logger name="ch.qos.logback.core" level="INFO" additivity="false"/>
+
+  <!-- logback jms appenders & loggers definition starts here -->
+  <!-- logback jms appenders & loggers definition starts here -->
+  <appender name="auditLogs" class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+    </filter>
+    <encoder>
+      <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+    </encoder>
+  </appender>
+  <appender name="perfLogs" class="ch.qos.logback.core.ConsoleAppender">
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+    </filter>
+    <encoder>
+      <pattern>"%d [%thread] %-5level %logger{1024} - %msg%n"</pattern>
+    </encoder>
+  </appender>
+  <appender name="ASYNC-audit" class="ch.qos.logback.classic.AsyncAppender">
+    <queueSize>1000</queueSize>
+    <discardingThreshold>0</discardingThreshold>
+    <appender-ref ref="Audit-Record-Queue"/>
+  </appender>
+
+  <logger name="AuditRecord" level="INFO" additivity="FALSE">
+    <appender-ref ref="STDOUT"/>
+  </logger>
+  <logger name="AuditRecord_DirectCall" level="INFO" additivity="FALSE">
+    <appender-ref ref="STDOUT"/>
+  </logger>
+  <appender name="ASYNC-perf" class="ch.qos.logback.classic.AsyncAppender">
+    <queueSize>1000</queueSize>
+    <discardingThreshold>0</discardingThreshold>
+    <appender-ref ref="Performance-Tracker-Queue"/>
+  </appender>
+  <logger name="PerfTrackerRecord" level="INFO" additivity="FALSE">
+    <appender-ref ref="ASYNC-perf"/>
+    <appender-ref ref="perfLogs"/>
+  </logger>
+  <!-- logback jms appenders & loggers definition ends here -->
+
+  <root level="DEBUG">
+    <appender-ref ref="DEBUG"/>
+    <appender-ref ref="ERROR"/>
+    <appender-ref ref="INFO"/>
+    <appender-ref ref="STDOUT"/>
+  </root>
+
+</configuration>
diff --git a/src/test/resources/dmaap-msg-router/message-router-compose.yml b/src/test/resources/dmaap-msg-router/message-router-compose.yml
new file mode 100644 (file)
index 0000000..e110a96
--- /dev/null
@@ -0,0 +1,82 @@
+version: '2'
+services:
+  zookeeper:
+    image: nexus3.onap.org:10001/onap/dmaap/zookeeper:6.0.3
+    ports:
+      - "2181:2181"
+    environment:
+      ZOOKEEPER_REPLICAS: 1
+      ZOOKEEPER_TICK_TIME: 2000
+      ZOOKEEPER_SYNC_LIMIT: 5
+      ZOOKEEPER_INIT_LIMIT: 10
+      ZOOKEEPER_MAX_CLIENT_CNXNS: 200
+      ZOOKEEPER_AUTOPURGE_SNAP_RETAIN_COUNT: 3
+      ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: 24
+      ZOOKEEPER_CLIENT_PORT: 2181
+      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/zookeeper/secrets/jaas/zk_server_jaas.conf -Dzookeeper.kerberos.removeHostFromPrincipal=true -Dzookeeper.kerberos.removeRealmFromPrincipal=true -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl
+      ZOOKEEPER_SERVER_ID: 1
+    volumes:
+      - ./zk_server_jaas.conf:/etc/zookeeper/secrets/jaas/zk_server_jaas.conf
+    networks:
+      net:
+        aliases:
+          - zookeeper
+
+  kafka:
+    image: nexus3.onap.org:10001/onap/dmaap/kafka111:1.0.5
+    ports:
+      - "9092:9092"
+    environment:
+      enableCadi: 'false'
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 40000
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 40000
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT://kafka:9092
+      KAFKA_LISTENERS: INTERNAL_PLAINTEXT://0.0.0.0:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT
+      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
+      KAFKA_OPTS: -Djava.security.auth.login.config=/etc/kafka/secrets/jaas/zk_client_jaas.conf
+      KAFKA_ZOOKEEPER_SET_ACL: 'true'
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+    volumes:
+      - ./zk_client_jaas.conf:/etc/kafka/secrets/jaas/zk_client_jaas.conf
+    networks:
+      net:
+        aliases:
+          - kafka
+    depends_on:
+      - zookeeper
+
+  onap-dmaap:
+    image: nexus3.onap.org:10001/onap/dmaap/dmaap-mr:1.1.20
+    ports:
+      - "3904:3904"
+      - "3905:3905"
+    environment:
+      enableCadi: 'false'
+    volumes:
+      - ./MsgRtrApi.properties:/appl/dmaapMR1/bundleconfig/etc/appprops/MsgRtrApi.properties
+      - ./logback.xml:/appl/dmaapMR1/bundleconfig/etc/logback.xml
+      - ./cadi.properties:/appl/dmaapMR1/etc/cadi.properties
+    networks:
+      net:
+        aliases:
+          - onap-dmaap
+    depends_on:
+      - zookeeper
+      - kafka
+
+  mockserver:
+    image: mockserver/mockserver:mockserver-5.11.2
+    command: -serverPort 1090 -proxyRemotePort 3904 -proxyRemoteHost onap-dmaap
+    ports:
+      - "1080:1090"
+    networks:
+      - net
+    depends_on:
+      - onap-dmaap
+
+networks:
+  net:
+    driver: bridge
diff --git a/src/test/resources/dmaap-msg-router/zk_client_jaas.conf b/src/test/resources/dmaap-msg-router/zk_client_jaas.conf
new file mode 100644 (file)
index 0000000..d4ef1eb
--- /dev/null
@@ -0,0 +1,5 @@
+Client {
+   org.apache.zookeeper.server.auth.DigestLoginModule required
+   username="kafka"
+   password="kafka_secret";
+ };
\ No newline at end of file
diff --git a/src/test/resources/dmaap-msg-router/zk_server_jaas.conf b/src/test/resources/dmaap-msg-router/zk_server_jaas.conf
new file mode 100644 (file)
index 0000000..26bf460
--- /dev/null
@@ -0,0 +1,4 @@
+Server {
+       org.apache.zookeeper.server.auth.DigestLoginModule required
+       user_kafka=kafka_secret;
+};
\ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json
deleted file mode 100644 (file)
index ca59c7e..0000000
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-  "channels": [
-    {
-      "name": "auth-credentials-null",
-      "cambria.url": "127.0.0.1:3904",
-      "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
-      "basicAuthPassword": null,
-      "basicAuthUsername": null,
-    },
-    {
-      "name": "auth-credentials-present",
-      "cambria.url": "127.0.0.1:3904",
-      "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
-      "basicAuthPassword": "samplePassword",
-      "basicAuthUsername": "sampleUser",
-    },
-    {
-      "name": "auth-credentials-missing",
-      "cambria.url": "127.0.0.1:3904",
-      "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
-    }
-  ]
-}
\ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json
deleted file mode 100644 (file)
index 9661e30..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-{
-  "channels": [
-    {
-      "name": "url-precedes-hosts",
-      "cambria.url": "127.0.0.1:3904",
-      "cambria.hosts": "h1.att.com,h2.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
-    },
-    {
-      "name": "url-key-missing",
-      "cambria.hosts": "h1.att.com,h2.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV",
-    },
-    {
-      "name": "url-is-null",
-      "cambria.url": null,
-      "cambria.hosts": "h1.att.com,h2.att.com",
-      "cambria.topic": "DCAE-SE-COLLECTOR-EVENTS-DEV"
-    }
-  ]
-}
\ No newline at end of file
diff --git a/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withDifferentStndDefinedNamespace.json
new file mode 100644 (file)
index 0000000..0a9604d
--- /dev/null
@@ -0,0 +1,108 @@
+
+{
+   "eventList": [
+      {
+         "commonEventHeader": {
+            "version": "4.1",
+            "vesEventListenerVersion": "7.2",
+            "domain": "stndDefined",
+            "eventId": "stndDefined-gNB_Nokia000001",
+            "eventName": "stndDefined-gNB-Nokia-PowerLost",
+            "stndDefinedNamespace": "3GPP-FaultSupervision",
+            "startEpochMicrosec": 1413378172000000,
+            "lastEpochMicrosec": 1413378172000000,
+            "reportingEntityName": "ibcx0001vm002oam001",
+            "sourceName": "scfx0001vm002cap001",
+            "sequence": 1,
+            "priority": "High"
+         },
+         "stndDefinedFields": {
+            "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+            "data": {
+               "href": 1,
+               "uri": "1",
+               "notificationId": 1,
+               "notificationType": "notifyNewAlarm",
+               "eventTime": "xyz",
+               "systemDN": "xyz",
+               "probableCause": 1,
+               "perceivedSeverity": "INDETERMINATE",
+               "rootCauseIndicator": false,
+               "specificProblem": "xyz",
+               "correlatedNotifications": [],
+               "backedUpStatus": true,
+               "backUpObject": "xyz",
+               "trendIndication": "MORE_SEVERE",
+               "thresholdInfo": {
+                  "observedMeasurement": "new",
+                  "observedValue": 123
+               },
+               "stateChangeDefinition": {
+               },
+               "monitoredAttributes": {
+                  "newAtt": "new"
+               },
+               "proposedRepairActions": "xyz",
+               "additionalText": "xyz",
+               "additionalInformation": {
+                  "addInfo": "new"
+               },
+               "alarmId": "1",
+               "alarmType": "COMMUNICATIONS_ALARM"
+            },
+            "stndDefinedFieldsVersion": "1.0"
+         }},
+      {
+         "commonEventHeader": {
+            "version": "4.1",
+            "vesEventListenerVersion": "7.2",
+            "domain": "stndDefined",
+            "eventId": "stndDefined-gNB_Nokia000001",
+            "eventName": "stndDefined-gNB-Nokia-PowerLost",
+            "stndDefinedNamespace": "3GPP-FaultSupervision2",
+            "startEpochMicrosec": 1413378172000000,
+            "lastEpochMicrosec": 1413378172000000,
+            "reportingEntityName": "ibcx0001vm002oam001",
+            "sourceName": "scfx0001vm002cap001",
+            "sequence": 1,
+            "priority": "High"
+         },
+         "stndDefinedFields": {
+            "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+            "data": {
+               "href": 1,
+               "uri": "1",
+               "notificationId": 1,
+               "notificationType": "notifyNewAlarm",
+               "eventTime": "xyz",
+               "systemDN": "xyz",
+               "probableCause": 1,
+               "perceivedSeverity": "INDETERMINATE",
+               "rootCauseIndicator": false,
+               "specificProblem": "xyz",
+               "correlatedNotifications": [],
+               "backedUpStatus": true,
+               "backUpObject": "xyz",
+               "trendIndication": "MORE_SEVERE",
+               "thresholdInfo": {
+                  "observedMeasurement": "new",
+                  "observedValue": 123
+               },
+               "stateChangeDefinition": {
+               },
+               "monitoredAttributes": {
+                  "newAtt": "new"
+               },
+               "proposedRepairActions": "xyz",
+               "additionalText": "xyz",
+               "additionalInformation": {
+                  "addInfo": "new"
+               },
+               "alarmId": "1",
+               "alarmType": "COMMUNICATIONS_ALARM"
+            },
+            "stndDefinedFieldsVersion": "1.0"
+         }}
+   ]
+}
+
diff --git a/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json b/src/test/resources/ves7_batch_stdnDefined_withSameStndDefinedNamespace.json
new file mode 100644 (file)
index 0000000..7e095d5
--- /dev/null
@@ -0,0 +1,108 @@
+
+{
+   "eventList": [
+      {
+         "commonEventHeader": {
+            "version": "4.1",
+            "vesEventListenerVersion": "7.2",
+            "domain": "stndDefined",
+            "eventId": "stndDefined-gNB_Nokia000001",
+            "eventName": "stndDefined-gNB-Nokia-PowerLost",
+            "stndDefinedNamespace": "3GPP-FaultSupervision",
+            "startEpochMicrosec": 1413378172000000,
+            "lastEpochMicrosec": 1413378172000000,
+            "reportingEntityName": "ibcx0001vm002oam001",
+            "sourceName": "scfx0001vm002cap001",
+            "sequence": 1,
+            "priority": "High"
+         },
+         "stndDefinedFields": {
+            "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+            "data": {
+               "href": 1,
+               "uri": "1",
+               "notificationId": 1,
+               "notificationType": "notifyNewAlarm",
+               "eventTime": "xyz",
+               "systemDN": "xyz",
+               "probableCause": 1,
+               "perceivedSeverity": "INDETERMINATE",
+               "rootCauseIndicator": false,
+               "specificProblem": "xyz",
+               "correlatedNotifications": [],
+               "backedUpStatus": true,
+               "backUpObject": "xyz",
+               "trendIndication": "MORE_SEVERE",
+               "thresholdInfo": {
+                  "observedMeasurement": "new",
+                  "observedValue": 123
+               },
+               "stateChangeDefinition": {
+               },
+               "monitoredAttributes": {
+                  "newAtt": "new"
+               },
+               "proposedRepairActions": "xyz",
+               "additionalText": "xyz",
+               "additionalInformation": {
+                  "addInfo": "new"
+               },
+               "alarmId": "1",
+               "alarmType": "COMMUNICATIONS_ALARM"
+            },
+            "stndDefinedFieldsVersion": "1.0"
+         }},
+      {
+         "commonEventHeader": {
+            "version": "4.1",
+            "vesEventListenerVersion": "7.2",
+            "domain": "stndDefined",
+            "eventId": "stndDefined-gNB_Nokia000001",
+            "eventName": "stndDefined-gNB-Nokia-PowerLost",
+            "stndDefinedNamespace": "3GPP-FaultSupervision",
+            "startEpochMicrosec": 1413378172000000,
+            "lastEpochMicrosec": 1413378172000000,
+            "reportingEntityName": "ibcx0001vm002oam001",
+            "sourceName": "scfx0001vm002cap001",
+            "sequence": 1,
+            "priority": "High"
+         },
+         "stndDefinedFields": {
+            "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+            "data": {
+               "href": 1,
+               "uri": "1",
+               "notificationId": 1,
+               "notificationType": "notifyNewAlarm",
+               "eventTime": "xyz",
+               "systemDN": "xyz",
+               "probableCause": 1,
+               "perceivedSeverity": "INDETERMINATE",
+               "rootCauseIndicator": false,
+               "specificProblem": "xyz",
+               "correlatedNotifications": [],
+               "backedUpStatus": true,
+               "backUpObject": "xyz",
+               "trendIndication": "MORE_SEVERE",
+               "thresholdInfo": {
+                  "observedMeasurement": "new",
+                  "observedValue": 123
+               },
+               "stateChangeDefinition": {
+               },
+               "monitoredAttributes": {
+                  "newAtt": "new"
+               },
+               "proposedRepairActions": "xyz",
+               "additionalText": "xyz",
+               "additionalInformation": {
+                  "addInfo": "new"
+               },
+               "alarmId": "1",
+               "alarmType": "COMMUNICATIONS_ALARM"
+            },
+            "stndDefinedFieldsVersion": "1.0"
+         }}
+   ]
+}
+
diff --git a/src/test/resources/ves7_batch_valid_two_different_domain.json b/src/test/resources/ves7_batch_valid_two_different_domain.json
new file mode 100644 (file)
index 0000000..648c81c
--- /dev/null
@@ -0,0 +1,90 @@
+
+{
+  "eventList": [
+    {
+      "commonEventHeader": {
+        "version": "4.0.1",
+        "vesEventListenerVersion": "7.0.1",
+        "domain": "fault",
+        "eventName": "Fault_Vscf:Acs-Ericcson_PilotNumberPoolExhaustion",
+        "eventId": "fault0000250",
+        "sequence": 1,
+        "priority": "High",
+        "reportingEntityId": "cc305d54-75b4-431b-adb2-eb6b9e541234",
+        "reportingEntityName": "ibcx0001vm002oam0011234",
+        "sourceId": "de305d54-75b4-431b-adb2-eb6b9e546014",
+        "sourceName": "scfx0001vm002cap001",
+        "nfVendorName": "Ericsson",
+        "nfNamingCode": "scfx",
+        "nfcNamingCode": "ssc",
+        "startEpochMicrosec": 1413378172000000,
+        "lastEpochMicrosec": 1413378172000000,
+        "timeZoneOffset": "UTC-05:30"
+      },
+      "faultFields": {
+        "faultFieldsVersion": "4.0",
+        "alarmCondition": "PilotNumberPoolExhaustion",
+        "eventSourceType": "other",
+        "specificProblem": "Calls cannot complete - pilot numbers are unavailable",
+        "eventSeverity": "CRITICAL",
+        "vfStatus": "Active",
+        "alarmAdditionalInformation": {
+          "PilotNumberPoolSize": "1000"
+        }
+      }
+    },
+    {
+      "commonEventHeader": {
+        "version": "4.1",
+        "vesEventListenerVersion": "7.2",
+        "domain": "stndDefined",
+        "eventId": "stndDefined-gNB_Nokia000001",
+        "eventName": "stndDefined-gNB-Nokia-PowerLost",
+        "stndDefinedNamespace": "3GPP-FaultSupervision",
+        "startEpochMicrosec": 1413378172000000,
+        "lastEpochMicrosec": 1413378172000000,
+        "reportingEntityName": "ibcx0001vm002oam001",
+        "sourceName": "scfx0001vm002cap001",
+        "sequence": 1,
+        "priority": "High"
+      },
+      "stndDefinedFields": {
+        "schemaReference": "https://forge.3gpp.org/rep/sa5/MnS/blob/SA88-Rel16/OpenAPI/faultMnS.yaml#components/schemas/NotifyNewAlarm",
+        "data": {
+          "href": 1,
+          "uri": "1",
+          "notificationId": 1,
+          "notificationType": "notifyNewAlarm",
+          "eventTime": "xyz",
+          "systemDN": "xyz",
+          "probableCause": 1,
+          "perceivedSeverity": "INDETERMINATE",
+          "rootCauseIndicator": false,
+          "specificProblem": "xyz",
+          "correlatedNotifications": [],
+          "backedUpStatus": true,
+          "backUpObject": "xyz",
+          "trendIndication": "MORE_SEVERE",
+          "thresholdInfo": {
+            "observedMeasurement": "new",
+            "observedValue": 123
+          },
+          "stateChangeDefinition": {
+          },
+          "monitoredAttributes": {
+            "newAtt": "new"
+          },
+          "proposedRepairActions": "xyz",
+          "additionalText": "xyz",
+          "additionalInformation": {
+            "addInfo": "new"
+          },
+          "alarmId": "1",
+          "alarmType": "COMMUNICATIONS_ALARM"
+        },
+        "stndDefinedFieldsVersion": "1.0"
+      }
+    }
+  ]
+}
+
index aede13c..e9e5596 100644 (file)
@@ -1,6 +1,6 @@
 major=1
-minor=9
-patch=2
+minor=10
+patch=0
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}
 snapshot_version=${base_version}-SNAPSHOT