Ncmp Producer Setup 72/129472/7
authormpriyank <priyank.maheshwari@est.tech>
Thu, 2 Jun 2022 10:27:55 +0000 (11:27 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Tue, 7 Jun 2022 13:39:45 +0000 (14:39 +0100)
- Ncmp Kafka Publisher setup
- Test scenarios
- Refactored existing test class which were using kafka producer and
  consumer properties to use from common MessageSpec
- Upcoming : Implementation Proposal and Actual logic to publish the
  correct event

Issue-ID: CPS-1035
Change-Id: I93ae392e8c4e4c85d88ca7858332e79b59e85535
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
cps-application/src/main/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/resources/expectedNcmpEvent.json [new file with mode: 0644]

index af886a1..3f005c9 100644 (file)
@@ -86,7 +86,8 @@ app:
     ncmp:\r
         async-m2m:\r
             topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}\r
-\r
+        events:\r
+            topic: ${NCMP_EVENTS_TOPIC:ncmp-events}\r
 notification:\r
     data-updated:\r
         enabled: false\r
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisher.java
new file mode 100644 (file)
index 0000000..52ac468
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.ncmp.cmhandle.lcm.event.NcmpEvent;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.util.concurrent.ListenableFutureCallback;
+
+/**
+ * NcmpEventsPublisher to publish the NcmpEvents on event of CREATE, UPDATE and DELETE.
+ */
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class NcmpEventsPublisher {
+
+    private final KafkaTemplate<String, NcmpEvent> ncmpEventKafkaTemplate;
+
+    /**
+     * NCMP Event publisher.
+     *
+     * @param topicName valid topic name
+     * @param eventKey  message key
+     * @param ncmpEvent message payload
+     */
+    public void publishEvent(final String topicName, final String eventKey, final NcmpEvent ncmpEvent) {
+        final ListenableFuture<SendResult<String, NcmpEvent>> ncmpEventFuture =
+                ncmpEventKafkaTemplate.send(topicName, eventKey, ncmpEvent);
+
+        ncmpEventFuture.addCallback(new ListenableFutureCallback<>() {
+            @Override
+            public void onFailure(final Throwable throwable) {
+                log.error("Unable to publish event to topic : {} due to {}", topicName, throwable.getMessage());
+            }
+
+            @Override
+            public void onSuccess(final SendResult<String, NcmpEvent> result) {
+                log.debug("Successfully published event to topic : {} , NcmpEvent : {}",
+                        result.getRecordMetadata().topic(), result.getProducerRecord().value());
+            }
+        });
+    }
+}
index aa6bf1a..31f179a 100644 (file)
 
 package org.onap.cps.ncmp.api.impl.async
 
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.StringSerializer
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.mapstruct.factory.Mappers
+import org.onap.cps.ncmp.api.utils.MessagingSpec
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
-import org.springframework.kafka.core.DefaultKafkaProducerFactory
-import org.springframework.kafka.core.KafkaTemplate
-import org.springframework.kafka.support.serializer.JsonSerializer
-import org.testcontainers.containers.KafkaContainer
-import org.testcontainers.spock.Testcontainers
-import org.testcontainers.utility.DockerImageName
-
-import java.time.Duration
-import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.ncmp.utils.TestUtils
 import org.onap.cps.utils.JsonObjectMapper
-import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.ncmp.utils.TestUtils;
-import org.springframework.boot.test.context.SpringBootTest
 import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.test.annotation.DirtiesContext
-import org.springframework.test.context.DynamicPropertyRegistry
-import org.springframework.test.context.DynamicPropertySource
-import spock.lang.Specification
+import org.testcontainers.spock.Testcontainers
+
+import java.time.Duration
 
-@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer])
+@SpringBootTest(classes = [NcmpAsyncRequestResponseEventProducer, NcmpAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
 @Testcontainers
 @DirtiesContext
-class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification {
-
-    static kafkaTestContainer = new KafkaContainer(
-        DockerImageName.parse('confluentinc/cp-kafka:6.2.1')
-    )
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
-    }
-
-    def setupSpec() {
-        kafkaTestContainer.start()
-    }
-
-    def producerConfigProperties = [
-        (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)      : kafkaTestContainer.getBootstrapServers().split(',')[0],
-        (ProducerConfig.RETRIES_CONFIG)                : 0,
-        (ProducerConfig.BATCH_SIZE_CONFIG)             : 16384,
-        (ProducerConfig.LINGER_MS_CONFIG)              : 1,
-        (ProducerConfig.BUFFER_MEMORY_CONFIG)          : 33554432,
-        (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)   : StringSerializer,
-        (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) : JsonSerializer
-    ]
-
-    def consumerConfigProperties = [
-        (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)       : kafkaTestContainer.getBootstrapServers().split(',')[0],
-        (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)  : StringDeserializer,
-        (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG): StringDeserializer,
-        (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)       : 'earliest',
-        (ConsumerConfig.GROUP_ID_CONFIG)                : 'test'
-    ]
-
-    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties))
+class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingSpec {
 
     @SpringBean
     NcmpAsyncRequestResponseEventProducer cpsAsyncRequestResponseEventProducerService =
@@ -96,9 +54,10 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification
             new NcmpAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducerService,
                     ncmpAsyncRequestResponseEventMapper)
 
-    def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
+    @Autowired
+    JsonObjectMapper jsonObjectMapper
 
-    def kafkaConsumer = new KafkaConsumer<>(getConsumerConfigProperties())
+    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('test'))
 
     def 'Consume and forward valid message'() {
         given: 'consumer has a subscription'
@@ -118,9 +77,4 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends Specification
                     NcmpAsyncRequestResponseEvent).getForwardedEvent().getEventId())
     }
 
-    @DynamicPropertySource
-    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
-        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
-    }
-
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsPublisherSpec.groovy
new file mode 100644 (file)
index 0000000..774a465
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.event
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.cps.ncmp.api.utils.MessagingSpec
+import org.onap.cps.ncmp.utils.TestUtils
+import org.onap.cps.utils.JsonObjectMapper
+import org.onap.ncmp.cmhandle.lcm.event.Event
+import org.onap.ncmp.cmhandle.lcm.event.NcmpEvent
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+import java.time.Duration
+
+@SpringBootTest(classes = [NcmpEventsPublisher, ObjectMapper, JsonObjectMapper])
+@Testcontainers
+@DirtiesContext
+class NcmpEventsPublisherSpec extends MessagingSpec {
+
+    def kafkaConsumer = new KafkaConsumer<>(consumerConfigProperties('ncmp-group'))
+
+    def testTopic = 'ncmp-events-test'
+
+    @SpringBean
+    NcmpEventsPublisher ncmpEventsPublisher = new NcmpEventsPublisher(kafkaTemplate)
+
+    @Autowired
+    JsonObjectMapper jsonObjectMapper
+
+
+    def 'Produce and Consume Ncmp Event'() {
+        given: 'event key and event data'
+            def eventKey = 'ncmp'
+            def eventData = new NcmpEvent(eventId: 'test-uuid',
+                eventCorrelationId: 'cmhandle-as-correlationid',
+                eventSchema: URI.create('org.onap.ncmp.cmhandle.lcm.event:v1'),
+                eventSource: URI.create('org.onap.ncmp'),
+                eventTime: '2022-12-31T20:30:40.000+0000',
+                eventType: 'org.onap.ncmp.cmhandle.lcm.event',
+                event: new Event(cmHandleId: 'cmhandle-test', cmhandleState: 'READY', operation: 'CREATE', cmhandleProperties: [['publicProperty1': 'value1'], ['publicProperty2': 'value2']]))
+        and: 'we have an expected NcmpEvent'
+            def expectedJsonString = TestUtils.getResourceFileContent('expectedNcmpEvent.json')
+            def expectedNcmpEvent = jsonObjectMapper.convertJsonString(expectedJsonString, NcmpEvent.class)
+        and: 'consumer has a subscription'
+            kafkaConsumer.subscribe([testTopic] as List<String>)
+        when: 'an event is published'
+            ncmpEventsPublisher.publishEvent(testTopic, eventKey, eventData)
+        and: 'topic is polled'
+            def records = kafkaConsumer.poll(Duration.ofMillis(1500))
+        then: 'no exception is thrown'
+            noExceptionThrown()
+        and: 'poll returns one record'
+            assert records.size() == 1
+        and: 'record key matches the expected event key'
+            def record = records.iterator().next()
+            assert eventKey == record.key
+        and: 'record matches the expected event'
+            assert expectedNcmpEvent == jsonObjectMapper.convertJsonString(record.value, NcmpEvent.class)
+
+    }
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/utils/MessagingSpec.groovy
new file mode 100644 (file)
index 0000000..097834a
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2022 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.utils
+
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.springframework.kafka.core.DefaultKafkaProducerFactory
+import org.springframework.kafka.core.KafkaTemplate
+import org.springframework.kafka.support.serializer.JsonSerializer
+import org.springframework.test.context.DynamicPropertyRegistry
+import org.springframework.test.context.DynamicPropertySource
+import org.testcontainers.containers.KafkaContainer
+import org.testcontainers.utility.DockerImageName
+import spock.lang.Specification
+
+class MessagingSpec extends Specification {
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread(kafkaTestContainer::stop))
+    }
+
+    def setupSpec() {
+        kafkaTestContainer.start()
+    }
+
+    static kafkaTestContainer = new KafkaContainer(DockerImageName.parse('confluentinc/cp-kafka:6.2.1'))
+
+    def producerConfigProperties() {
+        return [('bootstrap.servers'): kafkaTestContainer.getBootstrapServers().split(',')[0],
+                ('retries')          : 0,
+                ('batch-size')       : 16384,
+                ('linger.ms')        : 1,
+                ('buffer.memory')    : 33554432,
+                ('key.serializer')   : StringSerializer,
+                ('value.serializer') : JsonSerializer]
+    }
+
+    def consumerConfigProperties(consumerGroupId) {
+        return [('bootstrap.servers') : kafkaTestContainer.getBootstrapServers().split(',')[0],
+                ('key.deserializer')  : StringDeserializer,
+                ('value.deserializer'): StringDeserializer,
+                ('auto.offset.reset') : 'earliest',
+                ('group.id')          : consumerGroupId
+        ]
+    }
+
+    def kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<Integer, String>(producerConfigProperties()))
+
+    @DynamicPropertySource
+    static void registerKafkaProperties(DynamicPropertyRegistry dynamicPropertyRegistry) {
+        dynamicPropertyRegistry.add('spring.kafka.bootstrap-servers', kafkaTestContainer::getBootstrapServers)
+    }
+}
diff --git a/cps-ncmp-service/src/test/resources/expectedNcmpEvent.json b/cps-ncmp-service/src/test/resources/expectedNcmpEvent.json
new file mode 100644 (file)
index 0000000..903bc3a
--- /dev/null
@@ -0,0 +1,21 @@
+{
+  "eventId": "test-uuid",
+  "eventCorrelationId": "cmhandle-as-correlationid",
+  "eventTime": "2022-12-31T20:30:40.000+0000",
+  "eventSource": "org.onap.ncmp",
+  "eventType": "org.onap.ncmp.cmhandle.lcm.event",
+  "eventSchema": "org.onap.ncmp.cmhandle.lcm.event:v1",
+  "event": {
+    "cmHandleId": "cmhandle-test",
+    "operation": "CREATE",
+    "cmhandle-state": "READY",
+    "cmhandle-properties": [
+      {
+        "publicProperty1": "value1"
+      },
+      {
+        "publicProperty2": "value2"
+      }
+    ]
+  }
+}
\ No newline at end of file