Kafka (De-)Serialization Test 52/135252/1
authorToineSiebelink <toine.siebelink@est.tech>
Thu, 29 Jun 2023 17:00:48 +0000 (18:00 +0100)
committerToineSiebelink <toine.siebelink@est.tech>
Thu, 29 Jun 2023 17:09:30 +0000 (18:09 +0100)
- added test that proof (de-)serialization of DataOperation CloudEvent
- extracted new baseclass for kafka consumer tests

Issue-ID: CPS-1746
Signed-off-by: ToineSiebelink <toine.siebelink@est.tech>
Change-Id: I3b04dc0ed8cd1f1f48206cbcf0fd739532ba423c

cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/FilterStrategiesIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/SerializationIntegrationSpec.groovy [new file with mode: 0644]
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/ConsumerBaseSpec.groovy [new file with mode: 0644]

index 43d0648..783582e 100644 (file)
@@ -23,15 +23,12 @@ package org.onap.cps.ncmp.api.impl.async
 import io.cloudevents.core.builder.CloudEventBuilder
 import org.onap.cps.ncmp.api.impl.config.kafka.KafkaConfig
 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
-import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.api.kafka.ConsumerBaseSpec
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
 import org.spockframework.spring.SpringBean
-import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.beans.factory.annotation.Value
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry
-import org.springframework.kafka.test.utils.ContainerTestUtils
 import org.springframework.test.annotation.DirtiesContext
 import org.testcontainers.spock.Testcontainers
 import java.util.concurrent.TimeUnit
@@ -40,7 +37,7 @@ import java.util.concurrent.TimeUnit
 @DirtiesContext
 @Testcontainers
 @EnableAutoConfiguration
-class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
+class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
 
     @SpringBean
     EventsPublisher mockEventsPublisher = Mock()
@@ -48,16 +45,9 @@ class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
     @SpringBean
     NcmpAsyncRequestResponseEventMapper mapper = Stub()
 
-    @Autowired
-    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
-
     @Value('${app.ncmp.async-m2m.topic}')
     def topic
 
-    def setup() {
-        activateListeners()
-    }
-
     def 'Legacy event consumer with cloud event.'() {
         given: 'a cloud event of type: #eventType'
             def cloudEvent = CloudEventBuilder.v1().withId('some id')
@@ -113,9 +103,4 @@ class FilterStrategiesIntegrationSpec extends MessagingBaseSpec {
             0 * mockEventsPublisher.publishCloudEvent(*_)
     }
 
-    def activateListeners() {
-        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
-            messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
-        )
-    }
 }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/SerializationIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/async/SerializationIntegrationSpec.groovy
new file mode 100644 (file)
index 0000000..78a655b
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.impl.async
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import io.cloudevents.core.builder.CloudEventBuilder
+import org.onap.cps.ncmp.api.impl.config.kafka.KafkaConfig
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.kafka.ConsumerBaseSpec
+import org.onap.cps.ncmp.events.async1_0_0.Data
+import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
+import org.onap.cps.ncmp.events.async1_0_0.Response
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+import java.util.concurrent.TimeUnit
+
+@SpringBootTest(classes =[DataOperationEventConsumer, RecordFilterStrategies, KafkaConfig])
+@DirtiesContext
+@Testcontainers
+@EnableAutoConfiguration
+class SerializationIntegrationSpec extends ConsumerBaseSpec {
+
+    @SpringBean
+    EventsPublisher mockEventsPublisher = Mock()
+
+    @Autowired
+    private ObjectMapper objectMapper
+
+    @Value('${app.ncmp.async-m2m.topic}')
+    def topic
+
+    def capturedForwardedEvent
+
+    def 'Forwarding DataOperation Event Data.'() {
+        given: 'a data operation cloud event'
+            def cloudEventSent = createCloudEvent()
+        when: 'send the event'
+            cloudEventKafkaTemplate.send(topic, cloudEventSent)
+        and: 'wait a little for async processing of message'
+            TimeUnit.MILLISECONDS.sleep(300)
+        then: 'the event has been forwarded'
+            1 * mockEventsPublisher.publishCloudEvent('some client topic', 'my-event-id', _) >> { args -> { capturedForwardedEvent = args[2] } }
+        and: 'the forwarded event is identical to the event that was sent'
+            assert capturedForwardedEvent == cloudEventSent
+    }
+
+    def createCloudEvent() {
+        def dataOperationEvent = new DataOperationEvent(data: new Data(responses: [new Response()]))
+        return CloudEventBuilder.v1()
+            .withId('my-event-id')
+            .withType('DataOperationEvent')
+            .withSource(URI.create('some-source'))
+            .withExtension('destination','some client topic')
+            .withData(objectMapper.writeValueAsBytes(dataOperationEvent))
+            .build()
+    }
+
+}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/ConsumerBaseSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/kafka/ConsumerBaseSpec.groovy
new file mode 100644 (file)
index 0000000..940c59d
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (c) 2023 Nordix Foundation.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an 'AS IS' BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.api.kafka
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.test.utils.ContainerTestUtils
+
+@SpringBootTest
+class ConsumerBaseSpec extends MessagingBaseSpec {
+
+    @Autowired
+    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
+
+    def setup() {
+        activateListeners()
+    }
+
+    def activateListeners() {
+        kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+            messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+        )
+    }
+}