DMI Plugins respond back to NCMP Events Schema 51/134951/13
authorseanbeirne <sean.beirne@est.tech>
Mon, 19 Jun 2023 13:42:28 +0000 (14:42 +0100)
committerSean Beirne <sean.beirne@est.tech>
Wed, 26 Jul 2023 09:42:58 +0000 (09:42 +0000)
- Introduced Cloud Events for Subscription Response
- Created SubscriptionEventResponseMapper
- Created CloudEventContructionException for DMI
- Modified Folder Structure for Subscriptions

Issue-ID: CPS-1738
Signed-off-by: seanbeirne <sean.beirne@est.tech>
Change-Id: I242926fb3e6ceb8e6a6ca23bfbd794feacdeaf20

src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
src/main/java/org/onap/cps/ncmp/dmi/exception/CloudEventConstructionException.java [moved from src/main/java/org/onap/cps/ncmp/dmi/service/model/SubscriptionEventResponse.java with 64% similarity]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumer.java [deleted file]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumer.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapper.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/service/model/SubscriptionEventResponseStatus.java [deleted file]
src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy [deleted file]
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumerSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapperSpec.groovy [new file with mode: 0644]
src/test/resources/avcSubscriptionCreationEvent.json

index cb617f9..25ee92a 100644 (file)
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -124,4 +125,18 @@ public class KafkaConfig<T> {
         return kafkaTemplate;
     }
 
+    /**
+     * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+     *
+     * @return an instance of cloud Kafka template.
+     */
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
+        cloudEventConcurrentKafkaListenerContainerFactory() {
+        final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
+            new ConcurrentKafkaListenerContainerFactory<>();
+        containerFactory.setConsumerFactory(cloudEventConsumerFactory());
+        return containerFactory;
+    }
+
 }
  *  ============LICENSE_END=========================================================
  */
 
-package org.onap.cps.ncmp.dmi.service.model;
+package org.onap.cps.ncmp.dmi.exception;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import java.util.Map;
-import lombok.Getter;
-import lombok.Setter;
+public class CloudEventConstructionException extends DmiException {
 
-@JsonInclude(JsonInclude.Include.NON_NULL)
-@Getter
-@Setter
-public class SubscriptionEventResponse {
-    private String clientId;
-    private String subscriptionName;
-    private String dmiName;
-    private Map<String, SubscriptionEventResponseStatus> cmHandleIdToStatus;
+    private static final long serialVersionUID = 7747941311132087621L;
+
+    /**
+     * Constructor.
+     *
+     * @param message the error message
+     * @param details the error details
+     * @param cause   the error cause
+     */
+    public CloudEventConstructionException(final String message, final String details, final Throwable cause) {
+        super(message, details, cause);
+    }
 }
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumer.java
deleted file mode 100644 (file)
index 2094979..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.notifications.avc;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse;
-import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponseStatus;
-import org.onap.cps.ncmp.event.model.SubscriptionEvent;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.stereotype.Component;
-
-@Component
-@Slf4j
-@RequiredArgsConstructor
-public class SubscriptionEventConsumer {
-
-    @Value("${app.dmi.avc.subscription-response-topic}")
-    private String cmAvcSubscriptionResponseTopic;
-    @Value("${dmi.service.name}")
-    private String dmiName;
-    private final KafkaTemplate<String, SubscriptionEventResponse> kafkaTemplate;
-
-    /**
-     * Consume the specified event.
-     *
-     * @param subscriptionEvent the event to be consumed
-     */
-    @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
-            properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"})
-    public void consumeSubscriptionEvent(@Payload final SubscriptionEvent subscriptionEvent,
-                                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final String eventKey,
-                                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final String timeStampReceived) {
-        final Date dateAndTimeReceived = new Date(Long.parseLong(timeStampReceived));
-        final String subscriptionName = subscriptionEvent.getEvent().getSubscription().getName();
-        log.info("Subscription for SubscriptionName {} is received at {} by dmi plugin.", subscriptionName,
-                dateAndTimeReceived);
-        sendSubscriptionResponseMessage(eventKey, formSubscriptionEventResponse(subscriptionEvent));
-    }
-
-    /**
-     * Sends message to the configured topic.
-     *
-     * @param eventKey is the kafka message key
-     * @param subscriptionEventResponse is the payload of the kafka message
-     */
-    public void sendSubscriptionResponseMessage(final String eventKey,
-                                                final SubscriptionEventResponse subscriptionEventResponse) {
-        kafkaTemplate.send(cmAvcSubscriptionResponseTopic, eventKey, subscriptionEventResponse);
-    }
-
-    private SubscriptionEventResponse formSubscriptionEventResponse(final SubscriptionEvent subscriptionEvent) {
-        final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
-        subscriptionEventResponse.setClientId(subscriptionEvent.getEvent().getSubscription().getClientID());
-        subscriptionEventResponse.setSubscriptionName(subscriptionEvent.getEvent().getSubscription().getName());
-        subscriptionEventResponse.setDmiName(dmiName);
-        final List<Object> cmHandleIdToCmHandlePropertyMap = subscriptionEvent.getEvent()
-                .getPredicates()
-                .getTargets();
-        subscriptionEventResponse
-                .setCmHandleIdToStatus(populateCmHandleIdToStatus(extractCmHandleIds(cmHandleIdToCmHandlePropertyMap)));
-        return subscriptionEventResponse;
-    }
-
-    private Set<String> extractCmHandleIds(final List<Object> cmHandleIdTocmHandlePropertyMap) {
-        final Set<String> cmHandleIds = new HashSet<>();
-        for (final Object obj: cmHandleIdTocmHandlePropertyMap) {
-            final Map<String, Object> cmHandleIdToPropertiesMap = (Map<String, Object>) obj;
-            cmHandleIds.addAll(cmHandleIdToPropertiesMap.keySet());
-        }
-        return cmHandleIds;
-    }
-
-    private Map<String, SubscriptionEventResponseStatus> populateCmHandleIdToStatus(final Set<String> cmHandleIds) {
-        final Map<String, SubscriptionEventResponseStatus> result = new HashMap<>();
-        for (final String cmHandleId : cmHandleIds) {
-            result.put(cmHandleId, SubscriptionEventResponseStatus.ACCEPTED);
-        }
-        return result;
-    }
-}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumer.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumer.java
new file mode 100644 (file)
index 0000000..d7622fa
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avcsubscription;
+
+import io.cloudevents.CloudEvent;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+@RequiredArgsConstructor
+public class SubscriptionEventConsumer {
+
+    @Value("${app.dmi.avc.subscription-response-topic}")
+    private String cmAvcSubscriptionResponseTopic;
+    @Value("${dmi.service.name}")
+    private String dmiName;
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+
+    /**
+     * Consume the specified event.
+     *
+     * @param subscriptionCloudEvent the event to be consumed
+     */
+    @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
+        containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+    public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionCloudEvent) {
+        final SubscriptionEvent subscriptionEvent =
+            SubscriptionEventResponseMapper.toSubscriptionEvent(subscriptionCloudEvent.value());
+        if (subscriptionEvent != null) {
+            final String eventKey = subscriptionCloudEvent.value().getId();
+            final String subscriptionType = subscriptionCloudEvent.value().getType();
+            if ("subscriptionCreated".equals(subscriptionType)) {
+                sendSubscriptionResponseMessage(eventKey, "subscriptionCreatedStatus",
+                    formSubscriptionEventResponse(subscriptionEvent));
+            }
+        }
+    }
+
+    /**
+     * Sends message to the configured topic.
+     *
+     * @param eventKey is the kafka message key
+     * @param subscriptionType is the type of subscription action
+     * @param subscriptionEventResponse is the payload of the kafka message
+     */
+    public void sendSubscriptionResponseMessage(final String eventKey,
+                                                final String subscriptionType,
+                                                final SubscriptionEventResponse subscriptionEventResponse) {
+        cloudEventKafkaTemplate.send(cmAvcSubscriptionResponseTopic, eventKey,
+            SubscriptionEventResponseMapper.toCloudEvent(subscriptionEventResponse, subscriptionType, dmiName));
+    }
+
+    private SubscriptionEventResponse formSubscriptionEventResponse(final SubscriptionEvent subscriptionEvent) {
+        final SubscriptionEventResponse subscriptionEventResponse = new SubscriptionEventResponse();
+        final Data subscriptionResponseData = new Data();
+        subscriptionResponseData.setClientId(subscriptionEvent.getData().getSubscription().getClientID());
+        subscriptionResponseData.setSubscriptionName(subscriptionEvent.getData().getSubscription().getName());
+        subscriptionResponseData.setDmiName(dmiName);
+
+        final List<CmHandle> cmHandles = subscriptionEvent.getData()
+            .getPredicates().getTargets();
+        subscriptionResponseData
+                .setSubscriptionStatus(
+                    populateSubscriptionStatus(
+                        extractCmHandleIds(cmHandles)));
+        subscriptionEventResponse.setData(subscriptionResponseData);
+        return subscriptionEventResponse;
+    }
+
+    private Set<String> extractCmHandleIds(final List<CmHandle> cmHandles) {
+        final Set<String> cmHandleIds = new HashSet<>();
+
+        for (final CmHandle cmHandle : cmHandles) {
+            cmHandleIds.add(cmHandle.getId());
+        }
+        return cmHandleIds;
+    }
+
+    private List<SubscriptionStatus> populateSubscriptionStatus(final Set<String> cmHandleIds) {
+        final List<SubscriptionStatus> subscriptionStatuses = new ArrayList<>();
+        for (final String cmHandleId : cmHandleIds) {
+            final SubscriptionStatus status = new SubscriptionStatus();
+            status.setId(cmHandleId);
+            status.setStatus(SubscriptionStatus.Status.ACCEPTED);
+            subscriptionStatuses.add(status);
+        }
+        return subscriptionStatuses;
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapper.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapper.java
new file mode 100644 (file)
index 0000000..d760993
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avcsubscription;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.CloudEventUtils;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import io.cloudevents.core.data.PojoCloudEventData;
+import io.cloudevents.jackson.PojoCloudEventDataMapper;
+import java.net.URI;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.exception.CloudEventConstructionException;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@Slf4j
+public class SubscriptionEventResponseMapper {
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Maps CloudEvent object to SubscriptionEvent object.
+     *
+     * @param cloudEvent object
+     * @return SubscriptionEvent deserialized
+     */
+    public static SubscriptionEvent toSubscriptionEvent(final CloudEvent cloudEvent) {
+        final PojoCloudEventData<SubscriptionEvent> deserializedCloudEvent =
+            CloudEventUtils.mapData(cloudEvent,
+                PojoCloudEventDataMapper.from(objectMapper, SubscriptionEvent.class));
+        if (deserializedCloudEvent == null) {
+            log.debug("No data found in the consumed subscription response event");
+            return null;
+        } else {
+            final SubscriptionEvent subscriptionEvent = deserializedCloudEvent.getValue();
+            log.debug("Consuming subscription response event {}", subscriptionEvent);
+            return subscriptionEvent;
+        }
+    }
+
+    /**
+     * Maps SubscriptionEventResponse to a CloudEvent.
+     *
+     * @param subscriptionEventResponse object.
+     * @param subscriptionType String of subscription type.
+     * @param dmiName String of dmiName.
+     * @return CloudEvent built.
+     */
+    public static CloudEvent toCloudEvent(
+        final org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+            subscriptionEventResponse,
+        final String subscriptionType,
+        final String dmiName) {
+        try {
+            return CloudEventBuilder.v1().withId(UUID.randomUUID().toString())
+                .withSource(URI.create(dmiName))
+                .withType(subscriptionType)
+                .withDataSchema(URI.create("urn:cps:" + SubscriptionEventResponse.class.getName() + ":1.0.0"))
+                .withExtension("correlationid", subscriptionEventResponse.getData().getClientId() + ":"
+                    + subscriptionEventResponse.getData().getSubscriptionName())
+                .withData(objectMapper.writeValueAsBytes(subscriptionEventResponse))
+                .build();
+        } catch (final Exception ex) {
+            throw new CloudEventConstructionException("The Cloud Event could not be constructed", "Invalid object to "
+                + "serialize or required headers is missing", ex);
+        }
+    }
+
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/service/model/SubscriptionEventResponseStatus.java b/src/main/java/org/onap/cps/ncmp/dmi/service/model/SubscriptionEventResponseStatus.java
deleted file mode 100644 (file)
index 8987dda..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.service.model;
-
-public enum SubscriptionEventResponseStatus {
-    ACCEPTED,
-    REJECTED,
-    PENDING
-}
index f09434b..a3bf52b 100644 (file)
@@ -1,3 +1,23 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
 package org.onap.cps.ncmp.dmi.config.kafka
 
 import io.cloudevents.CloudEvent
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
deleted file mode 100644 (file)
index 59873ec..0000000
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- *  Copyright (C) 2023 Nordix Foundation
- *  ================================================================================
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- *  SPDX-License-Identifier: Apache-2.0
- *  ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.notifications.avc
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.ncmp.dmi.TestUtils
-import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse
-import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponseStatus
-import org.onap.cps.ncmp.event.model.SubscriptionEvent
-import org.spockframework.spring.SpringBean
-import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.annotation.DirtiesContext
-import org.testcontainers.spock.Testcontainers
-
-import java.time.Duration
-
-@SpringBootTest(classes = [SubscriptionEventConsumer])
-@Testcontainers
-@DirtiesContext
-class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
-
-    def objectMapper = new ObjectMapper()
-    def testTopic = 'dmi-ncmp-cm-avc-subscription'
-
-    @SpringBean
-    SubscriptionEventConsumer objectUnderTest = new SubscriptionEventConsumer(kafkaTemplate)
-
-    def 'Sends subscription event response successfully.'() {
-        given: 'an subscription event response'
-            def responseStatus = SubscriptionEventResponseStatus.ACCEPTED
-            def cmHandleIdToStatusMap = ['CmHandle1':responseStatus, 'CmHandle2':responseStatus]
-            def subscriptionEventResponse = new SubscriptionEventResponse(subscriptionName: 'cm-subscription-001',
-                clientId: 'SCO-9989752', dmiName: 'ncmp-dmi-plugin', cmHandleIdToStatus: cmHandleIdToStatusMap)
-            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
-        and: 'consumer has a subscription'
-            kafkaConsumer.subscribe([testTopic] as List<String>)
-        when: 'an event is published'
-            def eventKey = UUID.randomUUID().toString()
-            objectUnderTest.sendSubscriptionResponseMessage(eventKey, subscriptionEventResponse)
-        and: 'topic is polled'
-            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
-        then: 'poll returns one record'
-            assert records.size() == 1
-            def record = records.iterator().next()
-        and: 'the record value matches the expected event value'
-            def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
-            assert expectedValue == record.value
-            assert eventKey == record.key
-    }
-
-    def 'Consume valid message.'() {
-        given: 'an event'
-            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
-            def testEventSent = objectMapper.readValue(jsonData, SubscriptionEvent.class)
-            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
-        when: 'the valid event is consumed'
-            def eventKey = UUID.randomUUID().toString()
-            def timeStampReceived = '1679521929511'
-            objectUnderTest.consumeSubscriptionEvent(testEventSent, eventKey, timeStampReceived)
-        then: 'no exception is thrown'
-            noExceptionThrown()
-    }
-
-    def 'Extract cm handle ids from cm handle id to cm handle property map successfully.'() {
-        given: 'a list of cm handle id to cm handle property map'
-            def cmHandleIdToPropertyMap =
-                ['CmHandle1':['prop-x':'prop-valuex'], 'CmHandle2':['prop-y':'prop-valuey']]
-            def listOfCmHandleIdToPropertyMap =
-                [cmHandleIdToPropertyMap]
-        when: 'extract the cm handle ids'
-            def result = objectUnderTest.extractCmHandleIds(listOfCmHandleIdToPropertyMap)
-        then: 'cm handle ids are extracted as expected'
-            def expectedCmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
-            assert expectedCmHandleIds == result
-    }
-
-    def 'Populate cm handle id to status map successfully.'() {
-        given: 'a set of cm handle id'
-            def cmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
-            def responseStatus = SubscriptionEventResponseStatus.ACCEPTED
-        when: 'populate cm handle id to status map'
-            def result = objectUnderTest.populateCmHandleIdToStatus(cmHandleIds)
-        then: 'cm handle id to status map populated as expected'
-            def expectedMap = ['CmHandle1':responseStatus,'CmHandle2':responseStatus]
-            expectedMap == result
-    }
-}
\ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumerSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventConsumerSpec.groovy
new file mode 100644 (file)
index 0000000..ba02f53
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avcsubscription
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.onap.cps.ncmp.dmi.TestUtils
+import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+import org.spockframework.spring.SpringBean
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+import java.sql.Timestamp
+import java.time.Duration
+import java.time.OffsetDateTime
+import java.time.ZoneId
+
+@SpringBootTest(classes = [SubscriptionEventConsumer])
+@Testcontainers
+@DirtiesContext
+class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
+
+    def objectMapper = new ObjectMapper()
+    def testTopic = 'dmi-ncmp-cm-avc-subscription'
+
+    @SpringBean
+    SubscriptionEventConsumer objectUnderTest = new SubscriptionEventConsumer(cloudEventKafkaTemplate)
+
+    def 'Sends subscription cloud event response successfully.'() {
+        given: 'an subscription event response'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
+            def responseStatus = SubscriptionStatus.Status.ACCEPTED
+            def subscriptionStatuses = [new SubscriptionStatus(id: 'CmHandle1', status: responseStatus),
+                                        new SubscriptionStatus(id: 'CmHandle2', status: responseStatus)]
+            def subscriptionEventResponseData = new Data(subscriptionName: 'cm-subscription-001',
+                clientId: 'SCO-9989752', dmiName: 'ncmp-dmi-plugin', subscriptionStatus: subscriptionStatuses)
+            def subscriptionEventResponse =
+                    new SubscriptionEventResponse().withData(subscriptionEventResponseData)
+        and: 'consumer has a subscription'
+            kafkaConsumer.subscribe([testTopic] as List<String>)
+        when: 'an event is published'
+            def eventKey = UUID.randomUUID().toString()
+            objectUnderTest.sendSubscriptionResponseMessage(eventKey, "subscriptionCreatedStatus", subscriptionEventResponse)
+        and: 'topic is polled'
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+        then: 'poll returns one record'
+            assert records.size() == 1
+            def record = records.iterator().next()
+        and: 'the record value matches the expected event value'
+            def expectedValue = objectMapper.writeValueAsString(subscriptionEventResponse)
+            assert expectedValue == record.value
+            assert eventKey == record.key
+    }
+
+    def 'Consume valid message.'() {
+        given: 'an event'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            def eventKey = UUID.randomUUID().toString()
+            def timestamp = new Timestamp(1679521929511)
+            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
+            def subscriptionEvent = objectMapper.readValue(jsonData, SubscriptionEvent.class)
+            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType("subscriptionCreated")
+                    .withDataSchema(URI.create("urn:cps:" + SubscriptionEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", eventKey)
+                    .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
+                    .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
+            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
+        when: 'the valid event is consumed'
+            objectUnderTest.consumeSubscriptionEvent(testEventSent)
+        then: 'no exception is thrown'
+            noExceptionThrown()
+    }
+
+    def 'Consume invalid message.'() {
+        given: 'an invalid event type'
+            objectUnderTest.dmiName = 'test-ncmp-dmi'
+            def eventKey = UUID.randomUUID().toString()
+            def timestamp = new Timestamp(1679521929511)
+            objectUnderTest.cmAvcSubscriptionResponseTopic = testTopic
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                .withType("subscriptionCreated")
+                .withDataSchema(URI.create("urn:cps:" + SubscriptionEventResponse.class.getName() + ":1.0.0"))
+                .withTime(OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneId.of("UTC")))
+                .withExtension("correlationid", eventKey).build()
+            def testEventSent = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, eventKey, cloudEvent)
+        when: 'the invalid event is consumed'
+            objectUnderTest.consumeSubscriptionEvent(testEventSent)
+        then: 'no exception is thrown and event is logged'
+            noExceptionThrown()
+    }
+
+    def 'Form a SubscriptionEventResponse from a SubscriptionEvent.'() {
+        given: 'a SubscriptionEvent'
+            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
+            def subscriptionEvent = objectMapper.readValue(jsonData, SubscriptionEvent.class)
+        when: 'a SubscriptionResponseEvent is formed'
+            def result = objectUnderTest.formSubscriptionEventResponse(subscriptionEvent)
+        then: 'Confirm SubscriptionEventResponse was formed as expected'
+            assert result.data.clientId == "SCO-9989752"
+            assert result.data.subscriptionName == "cm-subscription-001"
+    }
+
+    def 'Extract cm handle ids from cm handle successfully.'() {
+        given: 'a list of cm handles'
+            def cmHandleIds =
+                [new CmHandle(id:'CmHandle1', additionalProperties: ['prop-x':'prop-valuex']),
+                 new CmHandle(id:'CmHandle2', additionalProperties: ['prop-y':'prop-valuey'])]
+        when: 'extract the cm handle ids'
+            def result = objectUnderTest.extractCmHandleIds(cmHandleIds)
+        then: 'cm handle ids are extracted as expected'
+            def expectedCmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
+            assert expectedCmHandleIds == result
+    }
+
+    def 'Populate cm handle id to subscriptionStatus successfully.'() {
+        given: 'a set of cm handle id'
+            def cmHandleIds = ['CmHandle1', 'CmHandle2'] as Set
+            def responseStatus = SubscriptionStatus.Status.ACCEPTED
+        when: 'populate cm handle id to subscriptionStatus'
+            def result = objectUnderTest.populateSubscriptionStatus(cmHandleIds).status
+        then: 'cm handle id to subscriptionStatus populated as expected'
+            def expectedStatus = [responseStatus,responseStatus]
+            expectedStatus == result
+    }
+}
\ No newline at end of file
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapperSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avcsubscription/SubscriptionEventResponseMapperSpec.groovy
new file mode 100644 (file)
index 0000000..d7b43aa
--- /dev/null
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ *  Copyright (C) 2023 Nordix Foundation
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avcsubscription
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.dmi.TestUtils
+import org.onap.cps.ncmp.dmi.exception.CloudEventConstructionException
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.Data
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionEventResponse
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.CmHandle
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.ncmp_to_dmi.SubscriptionEvent
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
+
+@SpringBootTest(classes = [ObjectMapper])
+class SubscriptionEventResponseMapperSpec extends Specification {
+
+    @Autowired
+    def objectMapper = new ObjectMapper()
+
+    @SpringBean
+    SubscriptionEventResponseMapper objectUnderTest = new SubscriptionEventResponseMapper()
+
+    def 'Convert a SubscriptionResponseEvent to CloudEvent successfully.'() {
+        given: 'a SubscriptionResponseEvent and an event key'
+            def dmiName = 'test-ncmp-dmi'
+            def responseStatus = SubscriptionStatus.Status.ACCEPTED
+            def subscriptionStatuses = [new SubscriptionStatus(id: 'CmHandle1', status: responseStatus),
+                                        new SubscriptionStatus(id: 'CmHandle2', status: responseStatus)]
+            def subscriptionEventResponseData = new Data(subscriptionName: 'cm-subscription-001',
+                    clientId: 'SCO-9989752', dmiName: 'ncmp-dmi-plugin', subscriptionStatus: subscriptionStatuses)
+            def subscriptionEventResponse =
+                    new SubscriptionEventResponse().withData(subscriptionEventResponseData)
+        when: 'a SubscriptionResponseEvent is converted'
+            def result = objectUnderTest.toCloudEvent(subscriptionEventResponse,"subscriptionCreatedStatus", dmiName)
+        then: 'SubscriptionResponseEvent is converted as expected'
+            def expectedCloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                .withType("subscriptionCreated")
+                .withDataSchema(URI.create("urn:cps:" + SubscriptionEventResponse.class.getName() + ":1.0.0"))
+                .withExtension("correlationid", subscriptionEventResponse.getData().getClientId() + ":"
+                        + subscriptionEventResponse.getData().getSubscriptionName())
+                .withData(objectMapper.writeValueAsBytes(subscriptionEventResponse)).build()
+            assert expectedCloudEvent.data == result.data
+    }
+
+    def 'Map the Cloud Event to data of the subscription event with incorrect content causes an exception'() {
+        given: 'an empty subscription response event and event key'
+            def dmiName = 'test-ncmp-dmi'
+            def testSubscriptionEventResponse = new SubscriptionEventResponse()
+        when: 'the subscription response event map to data of cloud event'
+            objectUnderTest.toCloudEvent(testSubscriptionEventResponse, "subscriptionCreatedStatus", dmiName)
+        then: 'a run time exception is thrown'
+           thrown(CloudEventConstructionException)
+    }
+
+    def 'Convert a CloudEvent to SubscriptionEvent.'() {
+        given: 'a CloudEvent'
+            def eventKey = UUID.randomUUID().toString()
+            def jsonData = TestUtils.getResourceFileContent('avcSubscriptionCreationEvent.json')
+            def subscriptionEvent = objectMapper.readValue(jsonData, SubscriptionEvent.class)
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType("subscriptionCreated")
+                    .withDataSchema(URI.create("urn:cps:" + SubscriptionEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", eventKey)
+                    .withData(objectMapper.writeValueAsBytes(subscriptionEvent)).build()
+        when: 'a SubscriptionEvent is formed'
+            def result = objectUnderTest.toSubscriptionEvent(cloudEvent)
+        then: 'Confirm SubscriptionEvent was formed as expected'
+            assert result == subscriptionEvent
+    }
+
+    def 'Convert a CloudEvent with Null data to SubscriptionEvent.'() {
+        given: 'a CloudEvent with null data'
+            def eventKey = UUID.randomUUID().toString()
+            def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
+                    .withType("subscriptionCreated")
+                    .withDataSchema(URI.create("urn:cps:" + SubscriptionEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", eventKey)
+                    .withData(null).build()
+        when: 'a SubscriptionEvent is formed'
+            def result = objectUnderTest.toSubscriptionEvent(cloudEvent)
+        then: 'Confirm SubscriptionEventResponse was formed as expected'
+            assert result == null
+    }
+}
\ No newline at end of file
index d52a91e..8fa1004 100644 (file)
@@ -1,45 +1,39 @@
 {
-  "version": "1.0",
-  "eventType": "CREATE",
-  "event": {
-    "subscription": {
-      "clientID": "SCO-9989752",
-      "name": "cm-subscription-001"
-    },
+  "data": {
     "dataType": {
-      "dataspace": "ALL",
       "dataCategory": "CM",
       "dataProvider": "CM-SERVICE",
-      "schemaName": "org.onap.ncmp:cm-network-avc-event.rfc8641",
-      "schemaVersion": "1.0"
+      "dataspace": "ALL"
     },
     "predicates": {
+      "datastore": "passthrough-running",
       "targets": [
         {
-          "CMHandle1": {
-            "cmhandle-properties": {
-              "prop1": "prop-value"
-            }
+          "id": "CMHandle1",
+          "additional-properties": {
+            "prop1": "prop-value"
           }
         },
         {
-          "CMHandle2": {
-            "cmhandle-properties": {
-              "prop-x": "prop-valuex",
-              "prop-p": "prop-valuep"
-            }
+          "id": "CMHandle2",
+          "additional-properties": {
+            "prop-x": "prop-valuex",
+            "prop-p": "prop-valuep"
           }
         },
         {
-          "CMHandle3": {
-            "cmhandle-properties": {
-              "prop-y": "prop-valuey"
-            }
+          "id": "CMHandle3",
+          "additional-properties": {
+            "prop-y": "prop-valuey"
           }
         }
       ],
-      "datastore": "passthrough-running",
-      "xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+      "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+    },
+    "subscription": {
+      "clientID": "SCO-9989752",
+      "name": "cm-subscription-001",
+      "isTagged": false
     }
   }
 }
\ No newline at end of file