DMI Data AVC RFC8641 and CloudEvent Compliant 08/135008/5
authormpriyank <priyank.maheshwari@est.tech>
Tue, 20 Jun 2023 12:42:31 +0000 (13:42 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Fri, 30 Jun 2023 11:29:19 +0000 (12:29 +0100)
- Introduced CloudEvents for DMI Data AVC Events
- Kafkatemplate config to support legacy as well as CloudEvents
- AvcEvent to be compliant with RFC8641 schema format
- Updating the released version of CPS and NCMP 3.3.3
- Refactored the test code to handle the changes related to CloudEvents

Issue-ID: CPS-1719
Change-Id: I082bbceda6dc26c860e1eff977ede219296d1875
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
13 files changed:
pom.xml
src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java [new file with mode: 0644]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java [deleted file]
src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventProducer.java
src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventSimulationController.java
src/main/resources/application.yml
src/test/groovy/org/onap/cps/ncmp/dmi/api/kafka/MessagingBaseSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy [new file with mode: 0644]
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/async/AsyncTaskExecutorIntegrationSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/AvcEventExecutorIntegrationSpec.groovy
src/test/groovy/org/onap/cps/ncmp/dmi/notifications/avc/SubscriptionEventConsumerSpec.groovy
src/test/resources/application.yml

diff --git a/pom.xml b/pom.xml
index e35c840..e08f725 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -40,7 +40,7 @@
     <properties>
         <app>org.onap.cps.ncmp.dmi.Application</app>
         <base.image>${docker.pull.registry}/onap/integration-java11:8.0.0</base.image>
-        <cps.version>3.2.6</cps.version>
+        <cps.version>3.3.3</cps.version>
         <image.tag>${project.version}-${maven.build.timestamp}</image.tag>
         <jacoco.minimum.coverage>0.98</jacoco.minimum.coverage>
         <maven.build.timestamp.format>yyyyMMdd'T'HHmmss'Z'</maven.build.timestamp.format>
                 <artifactId>swagger-annotations</artifactId>
                 <version>2.2.10</version>
             </dependency>
+            <dependency>
+                <groupId>io.cloudevents</groupId>
+                <artifactId>cloudevents-json-jackson</artifactId>
+                <version>2.5.0</version>
+            </dependency>
+            <dependency>
+                <groupId>io.cloudevents</groupId>
+                <artifactId>cloudevents-kafka</artifactId>
+                <version>2.5.0</version>
+            </dependency>
+            <dependency>
+                <groupId>io.cloudevents</groupId>
+                <artifactId>cloudevents-spring</artifactId>
+                <version>2.5.0</version>
+            </dependency>
             <dependency>
                 <groupId>net.logstash.logback</groupId>
                 <artifactId>logstash-logback-encoder</artifactId>
             <groupId>io.swagger.core.v3</groupId>
             <artifactId>swagger-annotations</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-json-jackson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.cloudevents</groupId>
+            <artifactId>cloudevents-spring</artifactId>
+        </dependency>
         <dependency>
             <groupId>net.logstash.logback</groupId>
             <artifactId>logstash-logback-encoder</artifactId>
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java
new file mode 100644 (file)
index 0000000..cb617f9
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.config.kafka;
+
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+/**
+ * kafka Configuration for legacy and cloud events.
+ *
+ * @param <T> valid legacy event to be published over the wire.
+ */
+@Configuration
+@EnableKafka
+@RequiredArgsConstructor
+public class KafkaConfig<T> {
+
+    private final KafkaProperties kafkaProperties;
+
+    /**
+     * This sets the strategy for creating legacy Kafka producer instance from kafka properties defined into
+     * application.yml and replaces value-serializer by JsonSerializer.
+     *
+     * @return legacy event producer instance.
+     */
+    @Bean
+    public ProducerFactory<String, T> legacyEventProducerFactory() {
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+        return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+    }
+
+    /**
+     * The ConsumerFactory implementation is to produce new legacy instance for provided kafka properties defined
+     * into application.yml and replaces deserializer-value by JsonDeserializer.
+     *
+     * @return an instance of legacy consumer factory.
+     */
+    @Bean
+    public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
+        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+    }
+
+    /**
+     * This sets the strategy for creating cloud Kafka producer instance from kafka properties defined into
+     * application.yml with CloudEventSerializer.
+     *
+     * @return cloud event producer instance.
+     */
+    @Bean
+    public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
+        final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties();
+        return new DefaultKafkaProducerFactory<>(producerConfigProperties);
+    }
+
+    /**
+     * The ConsumerFactory implementation to produce new legacy instance for provided kafka properties defined
+     * into application.yml having CloudEventDeserializer as deserializer-value.
+     *
+     * @return an instance of cloud consumer factory.
+     */
+    @Bean
+    public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
+        final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties();
+        return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
+    }
+
+    /**
+     * A legacy Kafka event template for executing high-level operations. The legacy producer factory ensure this.
+     *
+     * @return an instance of legacy Kafka template.
+     */
+    @Bean
+    @Primary
+    public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
+        final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+        kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
+        return kafkaTemplate;
+    }
+
+    /**
+     * A cloud Kafka event template for executing high-level operations. The cloud producer factory ensure this.
+     *
+     * @return an instance of cloud Kafka template.
+     */
+    @Bean
+    public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
+        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
+        kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
+        return kafkaTemplate;
+    }
+
+}
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcCloudEventCreator.java
new file mode 100644 (file)
index 0000000..b8bd277
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.notifications.avc;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.builder.CloudEventBuilder;
+import java.net.URI;
+import java.time.format.DateTimeFormatter;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
+import org.onap.cps.ncmp.events.avc1_0_0.Data;
+import org.onap.cps.ncmp.events.avc1_0_0.DatastoreChanges;
+import org.onap.cps.ncmp.events.avc1_0_0.Edit;
+import org.onap.cps.ncmp.events.avc1_0_0.IetfYangPatchYangPatch;
+import org.onap.cps.ncmp.events.avc1_0_0.PushChangeUpdate;
+import org.onap.cps.ncmp.events.avc1_0_0.Value;
+
+/**
+ * Helper to create AvcEvents.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DmiDataAvcCloudEventCreator {
+
+    private static final DateTimeFormatter dateTimeFormatter =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
+
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * Creates CloudEvent for DMI Data AVC.
+     *
+     * @param eventCorrelationId correlationid
+     * @return Cloud Event
+     */
+    public static CloudEvent createCloudEvent(final String eventCorrelationId) {
+
+        CloudEvent cloudEvent = null;
+
+        try {
+            cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("NCMP"))
+                    .withType(AvcEvent.class.getName())
+                    .withDataSchema(URI.create("urn:cps:" + AvcEvent.class.getName() + ":1.0.0"))
+                    .withExtension("correlationid", eventCorrelationId)
+                    .withData(objectMapper.writeValueAsBytes(createDmiDataAvcEvent())).build();
+        } catch (final JsonProcessingException jsonProcessingException) {
+            log.error("Unable to convert object to json : {}", jsonProcessingException.getMessage());
+        }
+
+        return cloudEvent;
+    }
+
+    private static AvcEvent createDmiDataAvcEvent() {
+        final AvcEvent avcEvent = new AvcEvent();
+        final Data data = new Data();
+        final PushChangeUpdate pushChangeUpdate = new PushChangeUpdate();
+        final DatastoreChanges datastoreChanges = new DatastoreChanges();
+        final IetfYangPatchYangPatch ietfYangPatchYangPatch = new IetfYangPatchYangPatch();
+        ietfYangPatchYangPatch.setPatchId("abcd");
+        final Edit edit1 = new Edit();
+        final Value value = new Value();
+        final Map<String, Object> attributeMap = new LinkedHashMap<>();
+        attributeMap.put("isHoAllowed", false);
+        value.setAttributes(List.of(attributeMap));
+        edit1.setEditId("editId");
+        edit1.setOperation("replace");
+        edit1.setTarget("target_xpath");
+        edit1.setValue(value);
+        ietfYangPatchYangPatch.setEdit(List.of(edit1));
+        datastoreChanges.setIetfYangPatchYangPatch(ietfYangPatchYangPatch);
+        pushChangeUpdate.setDatastoreChanges(datastoreChanges);
+        data.setPushChangeUpdate(pushChangeUpdate);
+
+        avcEvent.setData(data);
+        return avcEvent;
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java b/src/main/java/org/onap/cps/ncmp/dmi/notifications/avc/DmiDataAvcEventCreator.java
deleted file mode 100644 (file)
index 03ed1c4..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.dmi.notifications.avc;
-
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
-
-/**
- * Helper to create AvcEvents.
- */
-@Slf4j
-public class DmiDataAvcEventCreator {
-
-    private static final DateTimeFormatter dateTimeFormatter
-            = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
-
-    /**
-     * Create an AVC event.
-     *
-     * @param eventCorrelationId  the event correlation id
-     * @return DmiAsyncRequestResponseEvent
-     */
-    public AvcEvent createEvent(final String eventCorrelationId) {
-        final AvcEvent avcEvent = new AvcEvent();
-        avcEvent.setEventId(UUID.randomUUID().toString());
-        avcEvent.setEventCorrelationId(eventCorrelationId);
-        avcEvent.setEventType(AvcEvent.class.getName());
-        avcEvent.setEventSchema("urn:cps:" + AvcEvent.class.getName());
-        avcEvent.setEventSchemaVersion("v1");
-        avcEvent.setEventSource("NCMP");
-        avcEvent.setEventTime(ZonedDateTime.now().format(dateTimeFormatter));
-
-        final Map<String, Object> eventPayload = new LinkedHashMap<>();
-        eventPayload.put("push-change-update", "{}");
-        avcEvent.setEvent(eventPayload);
-
-        log.debug("Avc Event Created ID: {}", avcEvent.getEventId());
-        return avcEvent;
-    }
-
-}
\ No newline at end of file
index 4fd46b6..075dcf2 100644 (file)
 
 package org.onap.cps.ncmp.dmi.notifications.avc;
 
+
+import io.cloudevents.CloudEvent;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
@@ -31,16 +33,18 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class DmiDataAvcEventProducer {
 
-    private final KafkaTemplate<String, AvcEvent> kafkaTemplate;
-
+    private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
+    
     /**
-     * Sends message to the configured topic with a message key.
+     * Publishing DMI Data AVC event payload as CloudEvent.
      *
-     * @param requestId the request id
-     * @param avcEvent the event to publish
+     * @param requestId     the request id
+     * @param cloudAvcEvent event with data as DMI DataAVC event
      */
-    public void sendMessage(final String requestId, final AvcEvent avcEvent) {
-        kafkaTemplate.send("dmi-cm-events", requestId, avcEvent);
+    public void publishDmiDataAvcCloudEvent(final String requestId, final CloudEvent cloudAvcEvent) {
+        final ProducerRecord<String, CloudEvent> producerRecord =
+                new ProducerRecord<>("dmi-cm-events", requestId, cloudAvcEvent);
+        cloudEventKafkaTemplate.send(producerRecord);
         log.debug("AVC event sent");
     }
 }
index f7f4bf9..c5fb8fb 100644 (file)
 
 package org.onap.cps.ncmp.dmi.notifications.avc;
 
+import io.cloudevents.CloudEvent;
 import java.util.UUID;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.event.model.AvcEvent;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -42,18 +42,18 @@ public class DmiDataAvcEventSimulationController {
 
     /**
      * Simulate Event for AVC.
+     *
      * @param numberOfSimulatedEvents number of events to be generated
      * @return ResponseEntity
      */
     @GetMapping(path = "/v1/simulateDmiDataEvent")
-    public ResponseEntity<Void> simulateEvents(@RequestParam("numberOfSimulatedEvents")
-                                                   final Integer numberOfSimulatedEvents) {
-        final DmiDataAvcEventCreator dmiDataAvcEventCreator = new DmiDataAvcEventCreator();
+    public ResponseEntity<Void> simulateEvents(
+            @RequestParam("numberOfSimulatedEvents") final Integer numberOfSimulatedEvents) {
 
         for (int i = 0; i < numberOfSimulatedEvents; i++) {
             final String eventCorrelationId = UUID.randomUUID().toString();
-            final AvcEvent avcEvent = dmiDataAvcEventCreator.createEvent(eventCorrelationId);
-            dmiDataAvcEventProducer.sendMessage(eventCorrelationId, avcEvent);
+            final CloudEvent cloudEvent = DmiDataAvcCloudEventCreator.createCloudEvent(eventCorrelationId);
+            dmiDataAvcEventProducer.publishDmiDataAvcCloudEvent(eventCorrelationId, cloudEvent);
         }
 
         return new ResponseEntity<>(HttpStatus.OK);
index 0400143..d964748 100644 (file)
@@ -51,7 +51,7 @@ spring:
       protocol: PLAINTEXT
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      value-serializer: io.cloudevents.kafka.CloudEventSerializer
       client-id: ncmp-dmi-plugin
     consumer:
       group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
@@ -59,9 +59,13 @@ spring:
       value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
         spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
-        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
+        spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
         spring.json.use.type.headers: false
 
+    jackson:
+      serialization:
+        FAIL_ON_EMPTY_BEANS: false
+
 app:
   ncmp:
     async:
index e5f00f3..13dd043 100644 (file)
 
 package org.onap.cps.ncmp.dmi.api.kafka
 
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.serialization.StringSerializer
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory
 import org.springframework.kafka.core.DefaultKafkaProducerFactory
 import org.springframework.kafka.core.KafkaTemplate
 import org.springframework.kafka.support.serializer.JsonSerializer
@@ -45,28 +47,30 @@ class MessagingBaseSpec extends Specification {
 
     static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('registry.nordix.org/onaptest/confluentinc/cp-kafka:6.2.1').asCompatibleSubstituteFor('confluentinc/cp-kafka'))
 
-    def producerConfigProperties() {
+    def producerConfigProperties(valueSerializer) {
         return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('retries')          : 0,
                 ('batch-size')       : 16384,
                 ('linger.ms')        : 1,
                 ('buffer.memory')    : 33554432,
                 ('key.serializer')   : StringSerializer,
-                ('value.serializer') : JsonSerializer]
+                ('value.serializer') : valueSerializer]
     }
 
-    def consumerConfigProperties(consumerGroupId) {
+    def consumerConfigProperties(consumerGroupId, valueDeserializer) {
         return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
                 ('key.deserializer')  : StringDeserializer,
-                ('value.deserializer'): StringDeserializer,
+                ('value.deserializer'): valueDeserializer,
                 ('auto.offset.reset') : 'earliest',
                 ('group.id')          : consumerGroupId
         ]
     }
 
-    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
+    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties(JsonSerializer)))
+    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', StringDeserializer))
 
-    def consumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+    def cloudEventKafkaTemplate = new KafkaTemplate(new DefaultKafkaProducerFactory<String, CloudEvent>(producerConfigProperties(CloudEventSerializer)))
+    def cloudEventKafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-test-group', CloudEventDeserializer))
 
     @DynamicPropertySource
     static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
diff --git a/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy b/src/test/groovy/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfigSpec.groovy
new file mode 100644 (file)
index 0000000..f09434b
--- /dev/null
@@ -0,0 +1,42 @@
+package org.onap.cps.ncmp.dmi.config.kafka
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import org.spockframework.spring.EnableSharedInjection
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonDeserializer
+import org.springframework.kafka.support.serializer.JsonSerializer
+import spock.lang.Shared
+import spock.lang.Specification
+
+@SpringBootTest(classes = [KafkaProperties, KafkaConfig])
+@EnableSharedInjection
+@EnableConfigurationProperties
+class KafkaConfigSpec extends Specification {
+
+    @Shared
+    @Autowired
+    KafkaTemplate<String, String> legacyEventKafkaTemplate
+
+    @Shared
+    @Autowired
+    KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate
+
+    def 'Verify kafka template serializer and deserializer configuration for #eventType.'() {
+        expect: 'kafka template is instantiated'
+            assert kafkaTemplateInstance.properties['beanName'] == beanName
+        and: 'verify event key and value serializer'
+            assert kafkaTemplateInstance.properties['producerFactory'].configs['value.serializer'].asType(String.class).contains(valueSerializer.getCanonicalName())
+        and: 'verify event key and value deserializer'
+            assert kafkaTemplateInstance.properties['consumerFactory'].configs['spring.deserializer.value.delegate.class'].asType(String.class).contains(delegateDeserializer.getCanonicalName())
+        where: 'the following event type is used'
+            eventType      | kafkaTemplateInstance    || beanName                   | valueSerializer      | delegateDeserializer
+            'legacy event' | legacyEventKafkaTemplate || 'legacyEventKafkaTemplate' | JsonSerializer       | JsonDeserializer
+            'cloud event'  | cloudEventKafkaTemplate  || 'cloudEventKafkaTemplate'  | CloudEventSerializer | CloudEventDeserializer
+    }
+}
index 96e2c16..7ca2d54 100644 (file)
@@ -49,18 +49,18 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
 
     def setup() {
         cpsAsyncRequestResponseEventProducer.dmiNcmpTopic = TEST_TOPIC
-        consumer.subscribe([TEST_TOPIC] as List<String>)
+        kafkaConsumer.subscribe([TEST_TOPIC] as List<String>)
     }
 
     def cleanup() {
-        consumer.close()
+        kafkaConsumer.close()
     }
 
     def 'Publish and Subscribe message - success'() {
         when: 'a successful event is published'
             objectUnderTest.publishAsyncEvent(TEST_TOPIC, '12345','{}', 'OK', '200')
         and: 'the topic is polled'
-            def records = consumer.poll(Duration.ofMillis(1500))
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'the record received is the event sent'
             def record = records.iterator().next()
             DmiAsyncRequestResponseEvent event  = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
@@ -74,7 +74,7 @@ class AsyncTaskExecutorIntegrationSpec extends MessagingBaseSpec {
             def exception = new HttpClientRequestException('some cm handle', 'Node not found', HttpStatus.INTERNAL_SERVER_ERROR)
             objectUnderTest.publishAsyncFailureEvent(TEST_TOPIC, '67890', exception)
         and: 'the topic is polled'
-            def records = consumer.poll(Duration.ofMillis(1500))
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'the record received is the event sent'
             def record = records.iterator().next()
             DmiAsyncRequestResponseEvent event  = spiedObjectMapper.readValue(record.value(), DmiAsyncRequestResponseEvent)
index 5f7ed87..a7557bb 100644 (file)
 package org.onap.cps.ncmp.dmi.notifications.avc
 
 import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.jackson.PojoCloudEventDataMapper
 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
-import org.onap.cps.ncmp.dmi.notifications.async.AsyncTaskExecutor
-import org.onap.cps.ncmp.dmi.service.DmiService
-import org.onap.cps.ncmp.dmi.notifications.avc.DmiDataAvcEventSimulationController
-import org.onap.cps.ncmp.event.model.AvcEvent
+import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
 import org.spockframework.spring.SpringBean
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
@@ -40,9 +38,9 @@ import java.time.Duration
 class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
 
     @SpringBean
-    DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(kafkaTemplate)
+    DmiDataAvcEventProducer dmiDataAvcEventProducer = new DmiDataAvcEventProducer(cloudEventKafkaTemplate)
 
-    def dmiService =  new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
+    def dmiService = new DmiDataAvcEventSimulationController(dmiDataAvcEventProducer)
 
     def objectMapper = new ObjectMapper()
 
@@ -50,13 +48,14 @@ class AvcEventExecutorIntegrationSpec extends MessagingBaseSpec {
         given: 'a simulated event'
             dmiService.simulateEvents(1)
         and: 'a consumer subscribed to dmi-cm-events topic'
-            def consumer = new KafkaConsumer<>(consumerConfigProperties('test'))
-            consumer.subscribe(['dmi-cm-events'])
+            cloudEventKafkaConsumer.subscribe(['dmi-cm-events'])
         when: 'the next event record is consumed'
-            def record = consumer.poll(Duration.ofMillis(1500)).iterator().next()
+            def record = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)).iterator().next()
         then: 'record has correct topic'
             assert record.topic == 'dmi-cm-events'
         and: 'the record value can be mapped to an avcEvent'
-            objectMapper.readValue(record.value(), AvcEvent)
+            def dmiDataAvcEvent = record.value()
+            def convertedAvcEvent = CloudEventUtils.mapData(dmiDataAvcEvent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+            assert convertedAvcEvent != null
     }
 }
\ No newline at end of file
index 65567ef..59873ec 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.cps.ncmp.dmi.notifications.avc
 
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.serialization.StringDeserializer
 import org.onap.cps.ncmp.dmi.TestUtils
 import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
 import org.onap.cps.ncmp.dmi.service.model.SubscriptionEventResponse
@@ -39,8 +40,6 @@ import java.time.Duration
 @DirtiesContext
 class SubscriptionEventConsumerSpec extends MessagingBaseSpec {
 
-    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
-
     def objectMapper = new ObjectMapper()
     def testTopic = 'dmi-ncmp-cm-avc-subscription'
 
index d2aafbd..43eb0fc 100644 (file)
@@ -59,8 +59,16 @@ spring:
       protocol: PLAINTEXT
     producer:
       key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+      value-serializer: io.cloudevents.kafka.CloudEventSerializer
       client-id: ncmp-dmi-plugin
+    consumer:
+      group-id: ${NCMP_CONSUMER_GROUP_ID:ncmp-group}
+      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
+      properties:
+        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
+        spring.deserializer.value.delegate.class: io.cloudevents.kafka.CloudEventDeserializer
+        spring.json.use.type.headers: false
 
 app:
   ncmp: