2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.async
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.kafka.CloudEventDeserializer
26 import io.cloudevents.kafka.CloudEventSerializer
27 import io.cloudevents.kafka.impl.KafkaHeaders
28 import io.cloudevents.core.CloudEventUtils
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import io.cloudevents.jackson.PojoCloudEventDataMapper
31 import org.apache.kafka.clients.consumer.ConsumerRecord
32 import org.apache.kafka.clients.consumer.KafkaConsumer
33 import org.apache.kafka.common.header.internals.RecordHeaders
34 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
35 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
36 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
37 import org.onap.cps.ncmp.utils.TestUtils
38 import org.onap.cps.utils.JsonObjectMapper
39 import org.spockframework.spring.SpringBean
40 import org.springframework.beans.factory.annotation.Autowired
41 import org.springframework.boot.test.context.SpringBootTest
42 import org.springframework.kafka.listener.adapter.RecordFilterStrategy
43 import org.springframework.test.annotation.DirtiesContext
44 import org.testcontainers.spock.Testcontainers
45 import java.time.Duration
47 @SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper])
50 class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
53 EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
56 NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
59 JsonObjectMapper jsonObjectMapper
62 RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
65 ObjectMapper objectMapper
67 def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
68 def static clientTopic = 'client-topic'
69 def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
71 def 'Consume and publish event to client specified topic'() {
72 given: 'consumer subscribing to client topic'
73 cloudEventKafkaConsumer.subscribe([clientTopic])
74 and: 'consumer record for data operation event'
75 def consumerRecordIn = createConsumerRecord(dataOperationType)
76 when: 'the data operation event is consumed and published to client specified topic'
77 objectUnderTest.consumeAndPublish(consumerRecordIn)
78 and: 'the client specified topic is polled'
79 def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
80 then: 'verify cloud compliant headers'
81 def consumerRecordOutHeaders = consumerRecordOut.headers()
82 assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
83 assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
84 assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
85 and: 'verify that extension is included into header'
86 assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
87 and: 'map consumer record to expected event type'
88 def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
89 PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
90 and: 'verify published response data properties'
91 def response = dataOperationResponseEvent.data.responses[0]
92 response.operationId == 'some-operation-id'
93 response.statusCode == 'any-success-status-code'
94 response.statusMessage == 'Successfully applied changes'
95 response.responseContent as String == '[some-key:some-value]'
98 def 'Filter an event with type #eventType'() {
99 given: 'consumer record for event with type #eventType'
100 def consumerRecord = createConsumerRecord(eventType)
101 when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
102 def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
103 then: 'the event is #description'
104 assert result == expectedResult
105 where: 'filter the event based on the eventType #eventType'
106 description | eventType || expectedResult
107 'not filtered(the consumer will see the event)' | dataOperationType || false
108 'filtered(the consumer will not see the event)' | 'wrongType' || true
111 def createConsumerRecord(eventTypeAsString) {
112 def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
113 def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
115 CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
117 def headers = new RecordHeaders()
118 def cloudEventSerializer = new CloudEventSerializer()
119 cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
121 def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
122 headers.forEach(header -> consumerRecord.headers().add(header))
123 return consumerRecord
126 def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
127 return CloudEventBuilder.v1()
129 .withType(eventTypeAsString)
130 .withSource(URI.create("sample-test-source"))
131 .withData(testEventSentAsBytes)
132 .withExtension("correlationid", "request-id")
133 .withExtension("destination", clientTopic)