Merge "Improve performance of updateDataLeaves"
[cps.git] / cps-ncmp-service / src / test / groovy / org / onap / cps / ncmp / api / impl / async / NcmpAsyncDataOperationEventConsumerSpec.groovy
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2023 Nordix Foundation
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.api.impl.async
22
23 import com.fasterxml.jackson.databind.ObjectMapper
24 import io.cloudevents.CloudEvent
25 import io.cloudevents.kafka.CloudEventDeserializer
26 import io.cloudevents.kafka.CloudEventSerializer
27 import io.cloudevents.kafka.impl.KafkaHeaders
28 import io.cloudevents.core.CloudEventUtils
29 import io.cloudevents.core.builder.CloudEventBuilder
30 import io.cloudevents.jackson.PojoCloudEventDataMapper
31 import org.apache.kafka.clients.consumer.ConsumerRecord
32 import org.apache.kafka.clients.consumer.KafkaConsumer
33 import org.apache.kafka.common.header.internals.RecordHeaders
34 import org.onap.cps.ncmp.api.impl.events.EventsPublisher
35 import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
36 import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
37 import org.onap.cps.ncmp.utils.TestUtils
38 import org.onap.cps.utils.JsonObjectMapper
39 import org.spockframework.spring.SpringBean
40 import org.springframework.beans.factory.annotation.Autowired
41 import org.springframework.boot.test.context.SpringBootTest
42 import org.springframework.kafka.listener.adapter.RecordFilterStrategy
43 import org.springframework.test.annotation.DirtiesContext
44 import org.testcontainers.spock.Testcontainers
45 import java.time.Duration
46
47 @SpringBootTest(classes = [EventsPublisher, NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy,JsonObjectMapper, ObjectMapper])
48 @Testcontainers
49 @DirtiesContext
50 class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
51
52     @SpringBean
53     EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
54
55     @SpringBean
56     NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
57
58     @Autowired
59     JsonObjectMapper jsonObjectMapper
60
61     @Autowired
62     RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
63
64     @Autowired
65     ObjectMapper objectMapper
66
67     def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
68     def static clientTopic = 'client-topic'
69     def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
70
71     def 'Consume and publish event to client specified topic'() {
72         given: 'consumer subscribing to client topic'
73             cloudEventKafkaConsumer.subscribe([clientTopic])
74         and: 'consumer record for data operation event'
75             def consumerRecordIn = createConsumerRecord(dataOperationType)
76         when: 'the data operation event is consumed and published to client specified topic'
77             objectUnderTest.consumeAndPublish(consumerRecordIn)
78         and: 'the client specified topic is polled'
79             def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
80         then: 'verify cloud compliant headers'
81             def consumerRecordOutHeaders = consumerRecordOut.headers()
82             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
83             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
84             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
85         and: 'verify that extension is included into header'
86             assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
87         and: 'map consumer record to expected event type'
88             def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
89                     PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
90         and: 'verify published response data properties'
91             def response = dataOperationResponseEvent.data.responses[0]
92             response.operationId == 'some-operation-id'
93             response.statusCode == 'any-success-status-code'
94             response.statusMessage == 'Successfully applied changes'
95             response.responseContent as String == '[some-key:some-value]'
96     }
97
98     def 'Filter an event with type #eventType'() {
99         given: 'consumer record for event with type #eventType'
100             def consumerRecord = createConsumerRecord(eventType)
101         when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
102             def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
103         then: 'the event is #description'
104             assert result == expectedResult
105         where: 'filter the event based on the eventType #eventType'
106             description                                     | eventType         || expectedResult
107             'not filtered(the consumer will see the event)' | dataOperationType || false
108             'filtered(the consumer will not see the event)' | 'wrongType'       || true
109     }
110
111     def createConsumerRecord(eventTypeAsString) {
112         def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
113         def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
114
115         CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
116
117         def headers = new RecordHeaders()
118         def cloudEventSerializer = new CloudEventSerializer()
119         cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
120
121         def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
122         headers.forEach(header -> consumerRecord.headers().add(header))
123         return consumerRecord
124     }
125
126     def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
127         return CloudEventBuilder.v1()
128                 .withId("some-uuid")
129                 .withType(eventTypeAsString)
130                 .withSource(URI.create("sample-test-source"))
131                 .withData(testEventSentAsBytes)
132                 .withExtension("correlationid", "request-id")
133                 .withExtension("destination", clientTopic)
134                 .build();
135     }
136 }