From: Luke Gleeson Date: Tue, 20 Jun 2023 08:38:01 +0000 (+0000) Subject: Merge "DMI Data AVC to cloud events" X-Git-Tag: 3.3.3~33 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=4662655882edc48dd66fa83a9adf881362cb52ac;hp=7eae3fd589942c856f365600820aed18d104a98c;p=cps.git Merge "DMI Data AVC to cloud events" --- diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json deleted file mode 100644 index ea1e617c8..000000000 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-header-v1.json +++ /dev/null @@ -1,50 +0,0 @@ -{ - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-header-schema:v1", - "$ref": "#/definitions/AvcEventHeader", - "definitions": { - "AvcEventHeader": { - "description": "The header for AVC event.", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.avc.v1.AvcEventHeader", - "properties": { - "eventId": { - "description": "The unique id identifying the event generated by DMI for this AVC event.", - "type": "string" - }, - "eventCorrelationId": { - "description": "The request id passed by NCMP for this AVC event.", - "type": "string" - }, - "eventTime": { - "description": "The time of the AVC event. The expected format is 'yyyy-MM-dd'T'HH:mm:ss.SSSZ'.", - "type": "string" - }, - "eventSource": { - "description": "The source of the AVC event.", - "type": "string" - }, - "eventType": { - "description": "The type of the AVC event.", - "type": "string" - }, - "eventSchema": { - "description": "The event schema for AVC events.", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The event schema version for AVC events.", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventType", - "eventSchema", - "eventSchemaVersion" - ], - "additionalProperties": false - } - } -} \ No newline at end of file diff --git a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json similarity index 92% rename from cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json rename to cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json index 7e975c9b9..a5bed939b 100644 --- a/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-v1.json +++ b/cps-ncmp-events/src/main/resources/schemas/dmidataavc/avc-event-schema-1.0.0.json @@ -1,10 +1,9 @@ { "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:v1", + "$id": "urn:cps:org.onap.cps.ncmp.events:avc-event-schema:1.0.0", "$ref": "#/definitions/AvcEvent", "definitions": { "Edit": { - "javaType": "org.onap.cps.ncmp.events.avc.v1.Edit", "additionalProperties": false, "properties": { "edit-id": { @@ -48,9 +47,9 @@ "AvcEvent": { "description": "The payload for AVC event.", "type": "object", - "javaType": "org.onap.cps.ncmp.events.avc.v1.AvcEvent", + "javaType": "org.onap.cps.ncmp.events.avc1_0_0.AvcEvent", "properties": { - "event": { + "data": { "description": "The AVC event content compliant with RFC8641 format", "type": "object", "additionalProperties": false, @@ -99,7 +98,7 @@ } }, "required": [ - "event" + "data" ], "additionalProperties": false } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java index 7b28b4cd5..e61e7729b 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/EventsPublisher.java @@ -50,6 +50,19 @@ public class EventsPublisher { private final KafkaTemplate cloudEventKafkaTemplate; + /** + * Generic CloudEvent publisher. + * + * @param topicName valid topic name + * @param eventKey message key + * @param event message payload + */ + public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) { + final ListenableFuture> eventFuture + = cloudEventKafkaTemplate.send(topicName, eventKey, event); + eventFuture.addCallback(handleCallback(topicName)); + } + /** * Generic Event publisher. * @@ -95,7 +108,7 @@ public class EventsPublisher { publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } - private ListenableFutureCallback> handleCallback(final String topicName) { + private ListenableFutureCallback> handleCallback(final String topicName) { return new ListenableFutureCallback<>() { @Override public void onFailure(final Throwable throwable) { @@ -103,7 +116,7 @@ public class EventsPublisher { } @Override - public void onSuccess(final SendResult sendResult) { + public void onSuccess(final SendResult sendResult) { log.debug("Successfully published event to topic : {} , Event : {}", sendResult.getRecordMetadata().topic(), sendResult.getProducerRecord().value()); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java index f37497abe..b5ca176d1 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumer.java @@ -20,19 +20,17 @@ package org.onap.cps.ncmp.api.impl.events.avc; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import java.util.UUID; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; import org.onap.cps.ncmp.api.impl.events.EventsPublisher; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; -import org.springframework.util.SerializationUtils; /** * Listener for AVC events. @@ -47,34 +45,19 @@ public class AvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsPublisher eventsPublisher; - private final AvcEventMapper avcEventMapper; - + private final EventsPublisher eventsPublisher; /** * Incoming AvcEvent in the form of Consumer Record. * * @param avcEventConsumerRecord Incoming raw consumer record */ - @KafkaListener(topics = "${app.dmi.cm-events.topic}", - properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.avc.v1.AvcEvent"}) - public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { + @KafkaListener(topics = "${app.dmi.cm-events.topic}") + public void consumeAndForward(final ConsumerRecord avcEventConsumerRecord) { log.debug("Consuming AVC event {} ...", avcEventConsumerRecord.value()); - final String mutatedEventId = UUID.randomUUID().toString(); - mutateEventHeaderWithEventId(avcEventConsumerRecord.headers(), mutatedEventId); - final AvcEvent outgoingAvcEvent = avcEventMapper.toOutgoingAvcEvent(avcEventConsumerRecord.value()); - eventsPublisher.publishEvent(cmEventsTopicName, mutatedEventId, avcEventConsumerRecord.headers(), - outgoingAvcEvent); - } - - private void mutateEventHeaderWithEventId(final Headers eventHeaders, final String mutatedEventId) { - final String eventId = "eventId"; - final String existingEventId = - (String) SerializationUtils.deserialize(eventHeaders.lastHeader(eventId).value()); - eventHeaders.remove(eventId); - log.info("Removing existing eventId from header : {} and updating with id : {}", existingEventId, - mutatedEventId); - eventHeaders.add(new RecordHeader(eventId, SerializationUtils.serialize(mutatedEventId))); - + final String newEventId = UUID.randomUUID().toString(); + final CloudEvent outgoingAvcEvent = + CloudEventBuilder.from(avcEventConsumerRecord.value()).withId(newEventId).build(); + eventsPublisher.publishCloudEvent(cmEventsTopicName, newEventId, outgoingAvcEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java deleted file mode 100644 index 8246ed480..000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/events/avc/AvcEventMapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.api.impl.events.avc; - -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.avc.v1.AvcEvent; - - -/** - * Mapper for converting incoming {@link AvcEvent} to outgoing {@link AvcEvent}. - */ -@Mapper(componentModel = "spring") -public interface AvcEventMapper { - - AvcEvent toOutgoingAvcEvent(AvcEvent incomingAvcEvent); - -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy index 3dffac714..4a9e3ee81 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/events/avc/AvcEventConsumerSpec.groovy @@ -21,21 +21,23 @@ package org.onap.cps.ncmp.api.impl.events.avc import com.fasterxml.jackson.databind.ObjectMapper +import io.cloudevents.CloudEvent +import io.cloudevents.core.CloudEventUtils +import io.cloudevents.core.builder.CloudEventBuilder +import io.cloudevents.jackson.PojoCloudEventDataMapper +import io.cloudevents.kafka.CloudEventDeserializer +import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.serialization.StringDeserializer -import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.impl.events.EventsPublisher import org.onap.cps.ncmp.api.kafka.MessagingBaseSpec -import org.onap.cps.ncmp.events.avc.v1.AvcEvent +import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.utils.JsonObjectMapper import org.spockframework.spring.SpringBean import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.test.annotation.DirtiesContext -import org.springframework.util.SerializationUtils import org.testcontainers.spock.Testcontainers import java.time.Duration @@ -46,52 +48,49 @@ import java.time.Duration class AvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - AvcEventMapper avcEventMapper = Mappers.getMapper(AvcEventMapper.class) + EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean - EventsPublisher eventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate) - - @SpringBean - AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher, avcEventMapper) + AvcEventConsumer acvEventConsumer = new AvcEventConsumer(eventsPublisher) @Autowired JsonObjectMapper jsonObjectMapper - def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) + @Autowired + ObjectMapper objectMapper + + def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', CloudEventDeserializer)) def 'Consume and forward valid message'() { given: 'consumer has a subscription on a topic' def cmEventsTopicName = 'cm-events' acvEventConsumer.cmEventsTopicName = cmEventsTopicName - legacyEventKafkaConsumer.subscribe([cmEventsTopicName] as List) + cloudEventKafkaConsumer.subscribe([cmEventsTopicName] as List) and: 'an event is sent' def jsonData = TestUtils.getResourceFileContent('sampleAvcInputEvent.json') def testEventSent = jsonObjectMapper.convertJsonString(jsonData, AvcEvent.class) + def testCloudEventSent = CloudEventBuilder.v1() + .withData(objectMapper.writeValueAsBytes(testEventSent)) + .withId('sample-eventid') + .withType('sample-test-type') + .withSource(URI.create('sample-test-source')) + .withExtension('correlationid', 'test-cmhandle1').build() and: 'event has header information' - def consumerRecord = new ConsumerRecord(cmEventsTopicName,0, 0, 'sample-eventid', testEventSent) - consumerRecord.headers().add(new RecordHeader('eventId', SerializationUtils.serialize('sample-eventid'))) - consumerRecord.headers().add(new RecordHeader('eventCorrelationId', SerializationUtils.serialize('cmhandle1'))) + def consumerRecord = new ConsumerRecord(cmEventsTopicName, 0, 0, 'sample-eventid', testCloudEventSent) when: 'the event is consumed' acvEventConsumer.consumeAndForward(consumerRecord) and: 'the topic is polled' - def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) + def records = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record can be converted to AVC event' def record = records.iterator().next() - def convertedAvcEvent = jsonObjectMapper.convertJsonString(record.value(), AvcEvent.class) + def cloudevent = record.value() as CloudEvent + def convertedAvcEvent = CloudEventUtils.mapData(cloudevent, PojoCloudEventDataMapper.from(objectMapper, AvcEvent.class)).getValue() and: 'we have correct headers forwarded where correlation id matches' - record.headers().forEach(header -> { - if (header.key().equals('eventCorrelationId')) { - assert SerializationUtils.deserialize(header.value()) == 'cmhandle1' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_correlationid') == 'test-cmhandle1' and: 'event id differs(as per requirement) between consumed and forwarded' - record.headers().forEach(header -> { - if (header.key().equals('eventId')) { - assert SerializationUtils.deserialize(header.value()) != 'sample-eventid' - } - }) + assert KafkaHeaders.getParsedKafkaHeader(record.headers(), 'ce_id') != 'sample-eventid' and: 'the event payload still matches' assert testEventSent == convertedAvcEvent } diff --git a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json index 569343fed..5b297c86c 100644 --- a/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json +++ b/cps-ncmp-service/src/test/resources/sampleAvcInputEvent.json @@ -1,5 +1,5 @@ { - "event":{ + "data":{ "push-change-update":{ "datastore-changes":{ "ietf-yang-patch:yang-patch":{