Cm Avc Event to have same key 21/139321/1
authormpriyank <priyank.maheshwari@est.tech>
Mon, 4 Nov 2024 15:59:58 +0000 (15:59 +0000)
committermpriyank <priyank.maheshwari@est.tech>
Mon, 4 Nov 2024 16:03:39 +0000 (16:03 +0000)
- incoming Cm Avc Event from DMI Plugin is consumed and forwarded to
  target topic
- the key from source topic to be used in the target topic while
  forwarding
- with same key the ordering of the message will be preserved
- NOTE: the RTD related changes will be a separate patchset

Issue-ID: CPS-2436
Change-Id: Ie692663706b378022ec0d621d92ca5054bad8d1b
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/cmnotificationsubscription/cmavc/CmAvcEventConsumerSpec.groovy
integration-test/src/test/groovy/org/onap/cps/integration/functional/ncmp/CmHandleCreateSpec.groovy
k6-tests/once-off-test/kafka/produce-avc-event.js

index 9e90eab..2d1f648 100644 (file)
@@ -46,7 +46,8 @@ public class CmAvcEventConsumer {
     private final EventsPublisher<CloudEvent> eventsPublisher;
 
     /**
-     * Incoming AvcEvent in the form of Consumer Record.
+     * Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
+     * The key from incoming record will be used as key for the target topic as well to preserve the message ordering.
      *
      * @param cmAvcEventAsConsumerRecord Incoming raw consumer record
      */
@@ -55,7 +56,8 @@ public class CmAvcEventConsumer {
     public void consumeAndForward(
             final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
         final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
-        log.debug("Consuming AVC event {} ...", outgoingAvcEvent);
-        eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEvent.getId(), outgoingAvcEvent);
+        final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
+        log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
+        eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
     }
 }
index 06651be..ad5f42e 100644 (file)
@@ -64,6 +64,7 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
             cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List<String>)
         and: 'an event is sent'
             def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json')
+            def testEventKey = 'sample-eventid-key'
             def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class)
             def testCloudEventSent = CloudEventBuilder.v1()
                 .withData(jsonObjectMapper.asJsonBytes(testEventSent))
@@ -72,17 +73,19 @@ class CmAvcEventConsumerSpec extends MessagingBaseSpec {
                 .withSource(URI.create('sample-test-source'))
                 .withExtension('correlationid', 'test-cmhandle1').build()
         and: 'event has header information'
-            def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent)
-        when: 'the event is consumed'
+            def consumerRecord = new ConsumerRecord<String, CloudEvent>(cmEventsTopicName, 0, 0, testEventKey, testCloudEventSent)
+        when: 'the event is consumed and forwarded to target topic'
             acvEventConsumer.consumeAndForward(consumerRecord)
-        and: 'the topic is polled'
+        and: 'the target topic is polled'
             def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
             assert records.size() == 1
-        and: 'record can be converted to AVC event'
+        and: 'target record can be converted to AVC event'
             def record = records.iterator().next()
             def cloudEvent = record.value() as CloudEvent
             def convertedAvcEvent = toTargetEvent(cloudEvent, AvcEvent.class)
+        and: 'the target event has the same key as the source event to maintain the ordering in a partition'
+            assert record.key() == consumerRecord.key()
         and: 'we have correct headers forwarded where correlation id matches'
             assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
         and: 'event id is same between consumed and forwarded'
index 00ce38f..ffcba02 100644 (file)
@@ -242,7 +242,7 @@ class CmHandleCreateSpec extends CpsIntegrationSpecBase {
             if (retryAttempts == 0)
                 break
         }
-        consumerRecords
+        return consumerRecords
     }
 
 }
index 981a21a..db222f6 100644 (file)
@@ -50,6 +50,11 @@ export const options = {
     }
 };
 
+const getRandomNetworkElement = () => {
+    const networkElementIds = Array.from({ length: 10 }, (_, i) => `neType-${i + 1}`);
+    return networkElementIds[Math.floor(Math.random() * networkElementIds.length)];
+};
+
 function getCloudEventHeaders() {
     return {
         ce_type: 'org.onap.cps.ncmp.events.avc1_0_0.AvcEvent',
@@ -65,10 +70,11 @@ function getCloudEventHeaders() {
 
 export function sendKafkaMessages() {
     const cloudEventHeaders = getCloudEventHeaders();
+    const networkElementId = getRandomNetworkElement();
 
     const avcCloudEvent = {
         key: schemaRegistry.serialize({
-            data: cloudEventHeaders.ce_correlationid,
+            data: networkElementId,
             schemaType: SCHEMA_TYPE_STRING,
         }),
         value: schemaRegistry.serialize({