LcmEvent to have header now 44/134544/7
authormpriyank <priyank.maheshwari@est.tech>
Thu, 11 May 2023 12:55:55 +0000 (13:55 +0100)
committerPriyank Maheshwari <priyank.maheshwari@est.tech>
Tue, 16 May 2023 10:42:40 +0000 (10:42 +0000)
- Introduce LcmEventHeader as per our agreement with stakeholders
- The version remains v1 as we just add the header and dont change
  anything in the existing event payload.
- Later we will remove the header fields from the event payload.
- tests modification and little code refractor for dmi data avc as well

Issue-ID: CPS-1695
Change-Id: Ibef1138a6d0cc7ffec50b4c201a4d3417b99e27e
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
14 files changed:
cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json [moved from cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-headers-v1.json with 95% similarity]
cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json [new file with mode: 0644]
cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json [moved from cps-ncmp-events/src/main/resources/schemas/lcm-event-schema-v1.json with 97% similarity]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java [new file with mode: 0644]
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImpl.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreator.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorHelper.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsService.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsCreatorSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsPublisherSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventsServiceSpec.groovy
cps-ncmp-service/src/test/resources/expectedLcmEvent.json

@@ -1,6 +1,6 @@
 {
   "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-headers-schema:v1",
+  "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-header-schema:v1",
   "$ref": "#/definitions/AvcEventHeader",
   "definitions": {
     "AvcEventHeader": {
diff --git a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json
new file mode 100644 (file)
index 0000000..8c9922e
--- /dev/null
@@ -0,0 +1,56 @@
+{
+
+  "$schema": "https://json-schema.org/draft/2019-09/schema",
+  "$id": "urn:cps:org.onap.ncmp.cmhandle.lcm-event-header:v1",
+  "$ref": "#/definitions/LcmEventHeader",
+
+  "definitions": {
+    "LcmEventHeader": {
+      "description": "The header for LCM event",
+      "type": "object",
+      "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader",
+      "properties": {
+        "eventId": {
+          "description": "The unique id identifying the event",
+          "type": "string"
+        },
+        "eventCorrelationId": {
+          "description": "The id identifying the event",
+          "type": "string"
+        },
+        "eventTime": {
+          "description": "The timestamp when original event occurred",
+          "type": "string"
+        },
+        "eventSource": {
+          "description": "The source of the event",
+          "type": "string"
+        },
+        "eventType": {
+          "description": "The type of the event",
+          "type": "string"
+        },
+        "eventSchema": {
+          "description": "The schema that this event adheres to",
+          "type": "string"
+        },
+        "eventSchemaVersion": {
+          "description": "The version of the schema that this event adheres to",
+          "type": "string"
+        }
+      },
+      "required": [
+        "eventId",
+        "eventCorrelationId",
+        "eventTime",
+        "eventSource",
+        "eventType",
+        "eventSchema",
+        "eventSchemaVersion",
+        "event"
+      ],
+      "additionalProperties": false
+    }
+
+  }
+}
@@ -55,7 +55,7 @@
     "LcmEvent": {
       "description": "The payload for LCM event",
       "type": "object",
-      "javaType" : "org.onap.ncmp.cmhandle.event.lcm.LcmEvent",
+      "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEvent",
       "properties": {
         "eventId": {
           "description": "The unique id identifying the event",
index 4c84629..b0b091a 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events;
 
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
+import org.springframework.util.SerializationUtils;
 import org.springframework.util.concurrent.ListenableFuture;
 import org.springframework.util.concurrent.ListenableFutureCallback;
 
@@ -70,6 +73,20 @@ public class EventsPublisher<T> {
         eventFuture.addCallback(handleCallback(topicName));
     }
 
+    /**
+     * Generic Event Publisher with headers.
+     *
+     * @param topicName    valid topic name
+     * @param eventKey     message key
+     * @param eventHeaders map of event headers
+     * @param event        message payload
+     */
+    public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+            final T event) {
+
+        publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+    }
+
     private ListenableFutureCallback<SendResult<String, T>> handleCallback(final String topicName) {
         return new ListenableFutureCallback<>() {
             @Override
@@ -85,4 +102,10 @@ public class EventsPublisher<T> {
         };
     }
 
+    private Headers convertToKafkaHeaders(final Map<String, Object> eventMessageHeaders) {
+        final Headers eventHeaders = new RecordHeaders();
+        eventMessageHeaders.forEach((key, value) -> eventHeaders.add(key, SerializationUtils.serialize(value)));
+        return eventHeaders;
+    }
+
 }
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/lcm/LcmEventHeaderMapper.java
new file mode 100644 (file)
index 0000000..f7707d9
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.events.lcm;
+
+import org.mapstruct.Mapper;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+
+@Mapper(componentModel = "spring")
+public interface LcmEventHeaderMapper {
+
+    /**
+     * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}.
+     */
+
+    LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent);
+
+}
index 9d51843..f42cd39 100644 (file)
@@ -43,7 +43,8 @@ import org.onap.cps.ncmp.api.inventory.CompositeState;
 import org.onap.cps.ncmp.api.inventory.CompositeStateUtils;
 import org.onap.cps.ncmp.api.inventory.InventoryPersistence;
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
@@ -76,7 +77,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
 
     @Override
     @Timed(value = "cps.ncmp.cmhandle.state.update.batch",
-        description = "Time taken to update a batch of cm handle states")
+            description = "Time taken to update a batch of cm handle states")
     public void updateCmHandleStateBatch(final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle) {
         final Collection<CmHandleTransitionPair> cmHandleTransitionPairs =
                 prepareCmHandleTransitionBatch(cmHandleStatePerCmHandle);
@@ -106,9 +107,12 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
     private void publishLcmEvent(final NcmpServiceCmHandle targetNcmpServiceCmHandle,
             final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
         final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
+        final LcmEventHeader lcmEventHeader =
+                lcmEventsCreator.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle,
+                        existingNcmpServiceCmHandle);
         final LcmEvent lcmEvent =
                 lcmEventsCreator.populateLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
-        lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent);
+        lcmEventsService.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader);
     }
 
     private Collection<CmHandleTransitionPair> prepareCmHandleTransitionBatch(
@@ -221,6 +225,7 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState
     @Setter
     @NoArgsConstructor
     static class CmHandleTransitionPair {
+
         private YangModelCmHandle currentYangModelCmHandle;
         private YangModelCmHandle targetYangModelCmHandle;
     }
index a72e664..3c7c92b 100644 (file)
@@ -23,13 +23,15 @@ package org.onap.cps.ncmp.api.impl.events.lcm;
 import java.util.UUID;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.impl.utils.EventDateTimeFormatter;
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Event;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Event;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
 import org.springframework.stereotype.Component;
 
 
@@ -38,8 +40,11 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
+@RequiredArgsConstructor
 public class LcmEventsCreator {
 
+    private final LcmEventHeaderMapper lcmEventHeaderMapper;
+
     /**
      * Populate Lifecycle Management Event.
      *
@@ -53,6 +58,20 @@ public class LcmEventsCreator {
         return createLcmEvent(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
     }
 
+    /**
+     * Populate Lifecycle Management Event Header.
+     *
+     * @param cmHandleId                  cm handle identifier
+     * @param targetNcmpServiceCmHandle   target ncmp service cmhandle
+     * @param existingNcmpServiceCmHandle existing ncmp service cmhandle
+     * @return Populated LcmEventHeader
+     */
+    public LcmEventHeader populateLcmEventHeader(final String cmHandleId,
+            final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+            final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+        return createLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+    }
+
     private LcmEvent createLcmEvent(final String cmHandleId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
             final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
         final LcmEventType lcmEventType =
@@ -63,6 +82,15 @@ public class LcmEventsCreator {
         return lcmEvent;
     }
 
+    private LcmEventHeader createLcmEventHeader(final String cmHandleId,
+            final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+            final NcmpServiceCmHandle existingNcmpServiceCmHandle) {
+        final LcmEventType lcmEventType =
+                LcmEventsCreatorHelper.determineEventType(targetNcmpServiceCmHandle, existingNcmpServiceCmHandle);
+        final LcmEvent lcmEventWithHeaderInformation = lcmEventHeader(cmHandleId, lcmEventType);
+        return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderInformation);
+    }
+
     private Event lcmEventPayload(final String eventCorrelationId, final NcmpServiceCmHandle targetNcmpServiceCmHandle,
             final NcmpServiceCmHandle existingNcmpServiceCmHandle, final LcmEventType lcmEventType) {
         final Event event = new Event();
index 1322b72..d3b45d4 100644 (file)
@@ -34,7 +34,7 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle;
-import org.onap.ncmp.cmhandle.event.lcm.Values;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
 
 /**
  * LcmEventsCreatorHelper has helper methods to create LcmEvent.
index f258b45..2e1b914 100644 (file)
 package org.onap.cps.ncmp.api.impl.events.lcm;
 
 import io.micrometer.core.annotation.Timed;
+import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
+import org.onap.cps.utils.JsonObjectMapper;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.KafkaException;
 import org.springframework.stereotype.Service;
@@ -39,6 +42,7 @@ import org.springframework.stereotype.Service;
 public class LcmEventsService {
 
     private final EventsPublisher<LcmEvent> eventsPublisher;
+    private final JsonObjectMapper jsonObjectMapper;
 
     @Value("${app.lcm.events.topic:ncmp-events}")
     private String topicName;
@@ -47,17 +51,19 @@ public class LcmEventsService {
     private boolean notificationsEnabled;
 
     /**
-     * Publish the LcmEvent to the public topic.
+     * Publish the LcmEvent with header to the public topic.
      *
-     * @param cmHandleId Cm Handle Id
-     * @param lcmEvent  Lcm Event
+     * @param cmHandleId     Cm Handle Id
+     * @param lcmEvent       Lcm Event
+     * @param lcmEventHeader Lcm Event Header
      */
-    @Timed(value = "cps.ncmp.lcm.events.publish",
-        description = "Time taken to publish a LCM event")
-    public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent) {
+    @Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event")
+    public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
         if (notificationsEnabled) {
             try {
-                eventsPublisher.publishEvent(topicName, cmHandleId, lcmEvent);
+                final Map<String, Object> lcmEventHeadersMap =
+                        jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
+                eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
             } catch (final KafkaException e) {
                 log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
             }
index f660be7..e449d65 100644 (file)
@@ -54,7 +54,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'state is saved using inventory persistence'
             expectedCallsToInventoryPersistence * mockInventoryPersistence.saveCmHandleState(cmHandleId, _)
         and: 'event service is called to publish event'
-            expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            expectedCallsToEventService * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
         where: 'state change parameters are provided'
             stateChange          | fromCmHandleState | toCmHandleState || expectedCallsToInventoryPersistence | expectedCallsToEventService
             'ADVISED to READY'   | ADVISED           | READY           || 1                                   | 1
@@ -73,7 +73,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'state is saved using inventory persistence'
             1 * mockInventoryPersistence.saveCmHandle(yangModelCmHandle)
         and: 'event service is called to publish event'
-            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
     }
 
     def 'Update and Publish Events on State Change from LOCKED to ADVISED'() {
@@ -90,7 +90,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 }
             }
         and: 'event service is called to publish event'
-            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
     }
 
     def 'Update and Publish Events on State Change to READY'() {
@@ -111,7 +111,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 }
             }
         and: 'event service is called to publish event'
-            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
     }
 
     def 'Update cmHandle state to "DELETING"' (){
@@ -125,7 +125,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         and: 'method to persist cm handle state is called once'
             1 * mockInventoryPersistence.saveCmHandleState(yangModelCmHandle.getId(), yangModelCmHandle.getCompositeState())
         and: 'the method to publish Lcm event is called once'
-            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
     }
 
     def 'Update cmHandle state to "DELETED"' (){
@@ -137,7 +137,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
         then: 'the cm handle state is as expected'
             yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED
         and: 'the method to publish Lcm event is called once'
-            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _)
+            1 * mockLcmEventsService.publishLcmEvent(cmHandleId, _, _)
     }
 
     def 'No state change and no event to be published'() {
@@ -167,7 +167,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 }
             }
         and: 'event service is called to publish event'
-            2 * mockLcmEventsService.publishLcmEvent(_, _)
+            2 * mockLcmEventsService.publishLcmEvent(_, _, _)
 
     }
 
@@ -183,7 +183,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification {
                 }
             }
         and: 'event service is called to publish event'
-            2 * mockLcmEventsService.publishLcmEvent(_, _)
+            2 * mockLcmEventsService.publishLcmEvent(_, _, _)
 
     }
 
index f4adfc5..6d7d625 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.events.lcm
 
+import org.mapstruct.factory.Mappers
 import org.onap.cps.ncmp.api.inventory.CmHandleState
 import org.onap.cps.ncmp.api.inventory.CompositeState
 import org.onap.cps.ncmp.api.models.NcmpServiceCmHandle
-import org.onap.ncmp.cmhandle.event.lcm.Values
+import org.onap.cps.ncmp.events.lcm.v1.Values
 import spock.lang.Specification
 
 import static org.onap.cps.ncmp.api.inventory.CmHandleState.ADVISED
@@ -32,7 +33,9 @@ import static org.onap.cps.ncmp.api.inventory.CmHandleState.READY
 
 class LcmEventsCreatorSpec extends Specification {
 
-    def objectUnderTest = new LcmEventsCreator()
+    LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper)
+
+    def objectUnderTest = new LcmEventsCreator(lcmEventsHeaderMapper)
     def cmHandleId = 'test-cm-handle'
 
     def 'Map the LcmEvent for #operation'() {
@@ -159,4 +162,15 @@ class LcmEventsCreatorSpec extends Specification {
             'null to null'   | null                       | null
 
     }
+
+    def 'Map the LcmEventHeader'() {
+        given: 'NCMP cm handle details with current and old details'
+            def existingNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED))
+            def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY))
+        when: 'the event header is populated'
+            def result = objectUnderTest.populateLcmEventHeader(cmHandleId, targetNcmpServiceCmHandle, existingNcmpServiceCmHandle)
+        then: 'the header has fields populated'
+            assert result.eventCorrelationId == cmHandleId
+            assert result.eventId != null
+    }
 }
\ No newline at end of file
index 7c9464d..9374126 100644 (file)
@@ -24,14 +24,15 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.lcm.v1.Event
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
 import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
-import org.onap.ncmp.cmhandle.event.lcm.Event
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
 import org.spockframework.spring.SpringBean
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
+import org.springframework.util.SerializationUtils
 import org.testcontainers.spock.Testcontainers
 
 import java.time.Duration
@@ -55,19 +56,35 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
     def 'Produce and Consume Lcm Event'() {
         given: 'event key and event data'
             def eventKey = 'lcm'
+            def eventId = 'test-uuid'
+            def eventCorrelationId = 'cmhandle-test'
+            def eventSource = 'org.onap.ncmp'
+            def eventTime = '2022-12-31T20:30:40.000+0000'
+            def eventType = 'org.onap.ncmp.cmhandle.lcm.event'
+            def eventSchema = 'org.onap.ncmp.cmhandle.lcm.event'
+            def eventSchemaVersion = 'v1'
             def eventData = new LcmEvent(
-                eventId: 'test-uuid',
-                eventCorrelationId: 'cmhandle-as-correlationid',
-                eventSource: 'org.onap.ncmp',
-                eventTime: '2022-12-31T20:30:40.000+0000',
-                eventType: 'org.onap.ncmp.cmhandle.lcm.event',
-                eventSchema: 'org.onap.ncmp.cmhandle.lcm.event',
-                eventSchemaVersion: 'v1',
+                eventId: eventId,
+                eventCorrelationId: eventCorrelationId,
+                eventSource: eventSource,
+                eventTime: eventTime,
+                eventType: eventType,
+                eventSchema: eventSchema,
+                eventSchemaVersion: eventSchemaVersion,
                 event: new Event(cmHandleId: 'cmhandle-test'))
+        and: 'we have a event header'
+            def eventHeader = [
+                eventId           : eventId,
+                eventCorrelationId: eventCorrelationId,
+                eventSource       : eventSource,
+                eventTime         : eventTime,
+                eventType         : eventType,
+                eventSchema       : eventSchema,
+                eventSchemaVersion: eventSchemaVersion]
         and: 'consumer has a subscription'
             kafkaConsumer.subscribe([testTopic] as List<String>)
         when: 'an event is published'
-            lcmEventsPublisher.publishEvent(testTopic, eventKey, eventData)
+            lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
             def records = kafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
@@ -79,5 +96,8 @@ class LcmEventsPublisherSpec extends MessagingBaseSpec {
             def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json')
             def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class)
             assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class)
+        and: 'record header matches the expected parameters'
+            assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId
+            assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
     }
 }
index 65f4d50..2d3f8ac 100644 (file)
 package org.onap.cps.ncmp.api.impl.events.lcm
 
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
-import org.onap.ncmp.cmhandle.event.lcm.LcmEvent
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
+import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
+import org.onap.cps.utils.JsonObjectMapper
 import org.springframework.kafka.KafkaException
 import spock.lang.Specification
 
 class LcmEventsServiceSpec extends Specification {
 
     def mockLcmEventsPublisher = Mock(EventsPublisher)
+    def mockJsonObjectMapper = Mock(JsonObjectMapper)
 
-    def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher)
+    def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper)
 
     def 'Create and Publish lcm event where events are #scenario'() {
         given: 'a cm handle id and Lcm Event'
             def cmHandleId = 'test-cm-handle-id'
-            def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId)
+            def eventId = UUID.randomUUID().toString()
+            def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
+        and: 'we also have a lcm event header'
+            def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
         and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
             objectUnderTest.notificationsEnabled = notificationsEnabled
+        and: 'lcm event header is transformed to headers map'
+            mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
         when: 'service is called to publish lcm event'
-            objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent)
+            objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
         then: 'publisher is called #expectedTimesMethodCalled times'
-            expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, lcmEvent)
+            expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> {
+                args -> {
+                    def eventHeaders = (args[2] as Map<String,Object>)
+                    assert eventHeaders.containsKey('eventId')
+                    assert eventHeaders.containsKey('eventCorrelationId')
+                    assert eventHeaders.get('eventId') == eventId
+                    assert eventHeaders.get('eventCorrelationId') == cmHandleId
+                }
+            }
         where: 'the following values are used'
             scenario   | notificationsEnabled || expectedTimesMethodCalled
             'enabled'  | true                 || 1
@@ -50,12 +66,14 @@ class LcmEventsServiceSpec extends Specification {
     def 'Unable to send message'(){
         given: 'a cm handle id and Lcm Event and notification enabled'
             def cmHandleId = 'test-cm-handle-id'
-            def lcmEvent = new LcmEvent(eventId: UUID.randomUUID().toString(), eventCorrelationId: cmHandleId)
+            def eventId = UUID.randomUUID().toString()
+            def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
+            def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
             objectUnderTest.notificationsEnabled = true
         when: 'publisher set to throw an exception'
-            mockLcmEventsPublisher.publishEvent(*_) >> { throw new KafkaException('publishing failed')}
+            mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
         and: 'an event is publised'
-            objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent)
+            objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
         then: 'the exception is just logged and not bubbled up'
             noExceptionThrown()
     }
index 1db16ee..20d557d 100644 (file)
@@ -1,6 +1,6 @@
 {
   "eventId": "test-uuid",
-  "eventCorrelationId": "cmhandle-as-correlationid",
+  "eventCorrelationId": "cmhandle-test",
   "eventTime": "2022-12-31T20:30:40.000+0000",
   "eventSource": "org.onap.ncmp",
   "eventType": "org.onap.ncmp.cmhandle.lcm.event",