- Modified data operation record strategy to consume cloud event.
- Modified NCMP data operation event consumer to read cloud event
header. (prefixed with ce_)
- Modified event publisher to support legacy and cloud event based on
event type (if legacy event use legacy kafka template else cloud kafka
template).
- Introduced a new method onto json object mapper to convert json object
to bytes.
- Modified data operation consumer spec to produce a cloud event and validate it.
- Added Kafka Integration Test (for filtering)
Issue-ID: CPS-1724
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
Change-Id: Ide701b1ff952f57413cb4e4aa0d55c08753f0298
Signed-off-by: sourabh_sourabh <sourabh.sourabh@est.tech>
<artifactId>spock-spring</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
package org.onap.cps.ncmp.api.impl.async;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.kafka.common.header.Header;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
* @return boolean value.
*/
@Bean
- public RecordFilterStrategy<String, DataOperationEvent> includeDataOperationEventsOnly() {
+ public RecordFilterStrategy<String, CloudEvent> includeDataOperationEventsOnly() {
return consumedRecord -> {
- final Header eventTypeHeader = consumedRecord.headers().lastHeader("eventType");
- if (eventTypeHeader == null) {
- return false;
- }
- final String eventTypeHeaderValue = SerializationUtils.deserialize(eventTypeHeader.value());
- return !(eventTypeHeaderValue != null
- && eventTypeHeaderValue.contains("DataOperationEvent"));
+ final String eventTypeHeaderValue = KafkaHeaders.getParsedKafkaHeader(
+ consumedRecord.headers(), "ce_type");
+ return !(eventTypeHeaderValue.contains("DataOperationEvent"));
};
}
}
package org.onap.cps.ncmp.api.impl.async;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.kafka.impl.KafkaHeaders;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.events.EventsPublisher;
-import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpAsyncDataOperationEventConsumer {
- private final EventsPublisher<DataOperationEvent> eventsPublisher;
+ private final EventsPublisher<CloudEvent> eventsPublisher;
/**
* Consume the DataOperationResponseEvent published by producer to topic 'async-m2m.topic'
filter = "includeDataOperationEventsOnly",
groupId = "ncmp-data-operation-event-group",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent"})
- public void consumeAndPublish(final ConsumerRecord<String, DataOperationEvent>
- dataOperationEventConsumerRecord) {
+ public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
log.info("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
- final String eventTarget = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventTarget").value());
- final String eventId = SerializationUtils
- .deserialize(dataOperationEventConsumerRecord.headers().lastHeader("eventId").value());
- eventsPublisher.publishEvent(eventTarget, eventId, dataOperationEventConsumerRecord.headers(),
- dataOperationEventConsumerRecord.value());
+ final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_destination");
+ final String eventId = KafkaHeaders.getParsedKafkaHeader(
+ dataOperationEventConsumerRecord.headers(), "ce_id");
+ eventsPublisher.publishCloudEvent(eventTarget, eventId, dataOperationEventConsumerRecord.value());
}
}
* @param event message payload
* @deprecated This method is not needed anymore since the use of headers will be in place.
*/
- @Deprecated
+ @Deprecated(forRemoval = true)
public void publishEvent(final String topicName, final String eventKey, final T event) {
final ListenableFuture<SendResult<String, T>> eventFuture
= legacyKafkaEventTemplate.send(topicName, eventKey, event);
--- /dev/null
+package org.onap.cps.ncmp.api.impl.async
+
+import io.cloudevents.CloudEvent
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.kafka.CloudEventSerializer
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.onap.cps.ncmp.api.impl.events.EventsPublisher
+import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
+import org.spockframework.spring.SpringBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry
+import org.springframework.kafka.test.utils.ContainerTestUtils
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+import java.util.concurrent.TimeUnit
+
+@SpringBootTest(classes =[NcmpAsyncDataOperationEventConsumer, DataOperationRecordFilterStrategy])
+@DirtiesContext
+@Testcontainers
+@EnableAutoConfiguration
+class NcmpAsyncDataOperationEventConsumerIntegrationSpec extends MessagingBaseSpec {
+
+ @Autowired
+ private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
+
+ @SpringBean
+ EventsPublisher mockEventsPublisher = Mock()
+
+ def activateListeners() {
+ kafkaListenerEndpointRegistry.getListenerContainers().forEach(
+ messageListenerContainer -> { ContainerTestUtils.waitForAssignment(messageListenerContainer, 1) }
+ )
+ }
+
+ def 'Filtering Events.'() {
+ given: 'a cloud event of type: #eventType'
+ def cloudEvent = CloudEventBuilder.v1().withId("some-uuid")
+ .withType(eventType)
+ .withSource(URI.create("sample-test-source"))
+ .build();
+ and: 'activate message listener container'
+ activateListeners()
+ when: 'send the cloud event'
+ ProducerRecord<String, CloudEvent> record = new ProducerRecord<>('ncmp-async-m2m', cloudEvent)
+ KafkaProducer<String, CloudEvent> producer = new KafkaProducer<>(eventProducerConfigProperties(CloudEventSerializer))
+ producer.send(record);
+ and: 'wait a little for async processing of message'
+ TimeUnit.MILLISECONDS.sleep(100)
+ then: 'the event has only been forwarded for the correct type'
+ expectedNUmberOfCallsToPublishForwardedEvent * mockEventsPublisher.publishCloudEvent(_, _, _)
+ where:
+ eventType || expectedNUmberOfCallsToPublishForwardedEvent
+ 'DataOperationEvent' || 1
+ 'other type' || 0
+ 'any type contain the word "DataOperationEvent"' || 1
+ }
+}
+
package org.onap.cps.ncmp.api.impl.async
import com.fasterxml.jackson.databind.ObjectMapper
-import org.apache.commons.lang3.SerializationUtils
+import io.cloudevents.CloudEvent
+import io.cloudevents.kafka.CloudEventDeserializer
+import io.cloudevents.kafka.CloudEventSerializer
+import io.cloudevents.kafka.impl.KafkaHeaders
+import io.cloudevents.core.CloudEventUtils
+import io.cloudevents.core.builder.CloudEventBuilder
+import io.cloudevents.jackson.PojoCloudEventDataMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.common.header.internals.RecordHeaders
import org.onap.cps.ncmp.api.impl.events.EventsPublisher
import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
class NcmpAsyncDataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<DataOperationEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- NcmpAsyncDataOperationEventConsumer asyncDataOperationEventConsumer = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
+ NcmpAsyncDataOperationEventConsumer objectUnderTest = new NcmpAsyncDataOperationEventConsumer(asyncDataOperationEventPublisher)
@Autowired
JsonObjectMapper jsonObjectMapper
@Autowired
- RecordFilterStrategy<String, DataOperationEvent> recordFilterStrategy
+ RecordFilterStrategy<String, CloudEvent> dataOperationRecordFilterStrategy
- def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', StringDeserializer))
+ @Autowired
+ ObjectMapper objectMapper
+
+ def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test', CloudEventDeserializer))
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
def 'Consume and publish event to client specified topic'() {
given: 'consumer subscribing to client topic'
- legacyEventKafkaConsumer.subscribe([clientTopic])
+ cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for data operation event'
def consumerRecordIn = createConsumerRecord(dataOperationType)
when: 'the data operation event is consumed and published to client specified topic'
- asyncDataOperationEventConsumer.consumeAndPublish(consumerRecordIn)
+ objectUnderTest.consumeAndPublish(consumerRecordIn)
and: 'the client specified topic is polled'
- def consumerRecordOut = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
- then: 'verifying consumed event operationID is same as published event operationID'
- def operationIdIn = consumerRecordIn.value.data.responses[0].operationId
- def operationIdOut = jsonObjectMapper.convertJsonString((String)consumerRecordOut.value(), DataOperationEvent.class).data.responses[0].operationId
- assert operationIdIn == operationIdOut
+ def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
+ then: 'verify cloud compliant headers'
+ def consumerRecordOutHeaders = consumerRecordOut.headers()
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_correlationid') == 'request-id'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_id') == 'some-uuid'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_type') == dataOperationType
+ and: 'verify that extension is included into header'
+ assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
+ and: 'map consumer record to expected event type'
+ def dataOperationResponseEvent = CloudEventUtils.mapData(consumerRecordOut.value(),
+ PojoCloudEventDataMapper.from(objectMapper, DataOperationEvent.class)).getValue()
+ and: 'verify published response data properties'
+ def response = dataOperationResponseEvent.data.responses[0]
+ response.operationId == 'some-operation-id'
+ response.statusCode == 'any-success-status-code'
+ response.statusMessage == 'Successfully applied changes'
+ response.responseContent as String == '[some-key:some-value]'
}
def 'Filter an event with type #eventType'() {
given: 'consumer record for event with type #eventType'
def consumerRecord = createConsumerRecord(eventType)
when: 'while consuming the topic ncmp-async-m2m it executes the filter strategy'
- def result = recordFilterStrategy.filter(consumerRecord)
+ def result = dataOperationRecordFilterStrategy.filter(consumerRecord)
then: 'the event is #description'
assert result == expectedResult
where: 'filter the event based on the eventType #eventType'
def createConsumerRecord(eventTypeAsString) {
def jsonData = TestUtils.getResourceFileContent('dataOperationEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class)
- def eventTarget = SerializationUtils.serialize(clientTopic)
- def eventType = SerializationUtils.serialize(eventTypeAsString)
- def eventId = SerializationUtils.serialize('12345')
- def consumerRecord = new ConsumerRecord<String, Object>(clientTopic, 1, 1L, '123', testEventSent)
- consumerRecord.headers().add(new RecordHeader('eventId', eventId))
- consumerRecord.headers().add(new RecordHeader('eventTarget', eventTarget))
- consumerRecord.headers().add(new RecordHeader('eventType', eventType))
+ def testEventSentAsBytes = objectMapper.writeValueAsBytes(jsonObjectMapper.convertJsonString(jsonData, DataOperationEvent.class))
+
+ CloudEvent cloudEvent = getCloudEvent(eventTypeAsString, testEventSentAsBytes)
+
+ def headers = new RecordHeaders()
+ def cloudEventSerializer = new CloudEventSerializer()
+ cloudEventSerializer.serialize(clientTopic, headers, cloudEvent)
+
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>(clientTopic, 0, 0L, 'sample-message-key', cloudEvent)
+ headers.forEach(header -> consumerRecord.headers().add(header))
return consumerRecord
}
+
+ def getCloudEvent(eventTypeAsString, byte[] testEventSentAsBytes) {
+ return CloudEventBuilder.v1()
+ .withId("some-uuid")
+ .withType(eventTypeAsString)
+ .withSource(URI.create("sample-test-source"))
+ .withData(testEventSentAsBytes)
+ .withExtension("correlationid", "request-id")
+ .withExtension("destination", clientTopic)
+ .build();
+ }
}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.test.annotation.DirtiesContext
import org.testcontainers.spock.Testcontainers
-
import java.time.Duration
@SpringBootTest(classes = [EventsPublisher, AvcEventConsumer, ObjectMapper, JsonObjectMapper])
assert records.size() == 1
and: 'record can be converted to AVC event'
def record = records.iterator().next()
- def cloudevent = record.value() as CloudEvent
- def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue()
+ def cloudEvent = record.value() as CloudEvent
+ def convertedAvcEvent = CloudEventUtils.mapData(cloudEvent, PojoCloudEventDataMapper.from(new ObjectMapper(), AvcEvent.class)).getValue()
and: 'we have correct headers forwarded where correlation id matches'
assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1'
and: 'event id differs(as per requirement) between consumed and forwarded'
import com.fasterxml.jackson.databind.ObjectMapper
import spock.lang.Specification
-
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
app:
ncmp:
+ async-m2m:
+ topic: ncmp-async-m2m
avc:
subscription-topic: cm-avc-subscription
cm-events-topic: cm-events
{
- "data":{
- "responses":[
+ "data": {
+ "responses": [
{
- "operationId":"1",
- "ids":[
- "123",
- "124"
+ "operationId": "some-operation-id",
+ "ids": [
+ "cm-handle-id"
],
- "statusCode":1,
- "statusMessage":"Batch operation success on the above cmhandle ids ",
- "responseContent":{
- "ietf-netconf-monitoring:netconf-state":{
- "schemas":{
- "schema":[
- {
- "identifier":"ietf-tls-server",
- "version":"2016-11-02",
- "format":"ietf-netconf-monitoring:yang",
- "namespace":"urn:ietf:params:xml:ns:yang:ietf-tls-server",
- "location":[
- "NETCONF"
- ]
- }
- ]
- }
- }
+ "statusCode": "any-success-status-code",
+ "statusMessage": "Successfully applied changes",
+ "responseContent": {
+ "some-key": "some-value"
}
}
]