package org.onap.cps.ncmp.api.impl.async
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+ NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+ RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
- def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+ @Autowired
+ ObjectMapper objectMapper
+
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
- legacyEventKafkaConsumer.subscribe([clientTopic])
+ cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for data operation event'
def consumerRecordIn = createConsumerRecord(dataOperationType)
when: 'the data operation event is consumed and published to client specified topic'
- asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+ objectUnderTest.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
- def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
- then: 'verifying consumed event operationID is same as published event operationID'
- def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
- def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
- assert operationIdIn == operationIdOut
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ then: 'verify cloud compliant headers'
+ def consumerRecordOutHeaders = consumerRecordOut.headers()
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+ and: 'verify that extension is included into header'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+ and: 'map consumer record to expected event type'
+ def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+ PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ and: 'verify published response data properties'
+ def response = dataOperationResponseEvent.data.responses[0]
+ response.operationId == 'some-operation-id'
+ response.statusCode == 'any-success-status-code'
+ response.statusMessage == 'Successfully applied changes'
+ response.responseContent as String == '[some-key:some-value]'
}
def 'Filter an event with type #eventType'() {
given: 'consumer record for event with type #eventType'
def consumerRecord = createConsumerRecord(eventType)
when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
- def result = recordFilterStrategy.filter(consumerRecord)
+ def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
then: 'the event is #description'
assert result == expectedResult
where: 'filter the event based on the eventType #eventType'
def createConsumerRecord(eventTypeAsString) {
def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
- def eventTarget = SerializationUtils.serialize(clientTopic)
- def eventType = SerializationUtils.serialize(eventTypeAsString)
- def eventId = SerializationUtils.serialize('12345')
- def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
- consumerRecord.headers().add(new RecordHeader('eventId', eventId))
- consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
- consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+ def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+ CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+ def headers = new RecordHeaders()
+ def cloudEventSerializer = new CloudEventSerializer()
+ cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+ headers.forEach(header -> consumerRecord.headers().add(header))
return consumerRecord
}
+
+ def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+ return CloudEventBuilder.v1()
+ .withId("some-uuid")
+ .withType(eventTypeAsString)
+ .withSource(URI.create("sample-test-source"))
+ .withData(testEventSentAsBytes)
+ .withExtension("correlationid", "request-id")
+ .withExtension("destination", clientTopic)
+ .build();
+ }
}