From: mpriyank Date: Wed, 24 Sep 2025 12:22:21 +0000 (+0100) Subject: Refactor EventsProducer class to remove Generic type for legacyevent X-Git-Tag: 3.7.1~2^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=fb8719e95b86724e995212492b725b7d9bb90cce;p=cps.git Refactor EventsProducer class to remove Generic type for legacyevent - EventsProducer refactored to not use generic type and instead use LegacyEvent type for all the events that still uses non-cloud events. - Events are implementing LegacyEvent interface now just to differentiate b/w cloud and non-cloud events when creating a kafkatemplate to send events Issue-ID: CPS-2990 Change-Id: Ic905ceadefca9e492cb999b633c86d13227c30c2 Signed-off-by: mpriyank --- diff --git a/cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java b/cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java new file mode 100644 index 0000000000..26705c24ed --- /dev/null +++ b/cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.events; + +/** + * Marker interface used to categorize legacy events under a common type hierarchy. + */ +public interface LegacyEvent { } diff --git a/cps-ncmp-events/pom.xml b/cps-ncmp-events/pom.xml index 997e39fbc0..5ac8a3bcf9 100644 --- a/cps-ncmp-events/pom.xml +++ b/cps-ncmp-events/pom.xml @@ -43,6 +43,10 @@ org.projectlombok lombok + + org.onap.cps + cps-events + diff --git a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json index bd0d90d04a..c79af2e49d 100644 --- a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json +++ b/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json @@ -68,6 +68,7 @@ "description": "The payload for LCM event", "type": "object", "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEvent", + "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"], "properties": { "eventId": { "description": "The unique id identifying the event", diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/data-operation-event-schema-1.0.0.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/data-operation-event-schema-1.0.0.json index c2915187c7..088f2b5d3c 100644 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/data-operation-event-schema-1.0.0.json +++ b/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/data-operation-event-schema-1.0.0.json @@ -7,6 +7,7 @@ "description": "The payload of data operation event.", "type": "object", "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent", + "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"], "properties": { "data": { "description": "The payload content of the requested data.", diff --git a/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/ncmp-async-request-response-event-schema-v1.json b/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/ncmp-async-request-response-event-schema-v1.json index cb10f75882..4a00fce440 100644 --- a/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/ncmp-async-request-response-event-schema-v1.json +++ b/cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/ncmp-async-request-response-event-schema-v1.json @@ -7,6 +7,7 @@ "description": "The payload for CPS async request response event.", "type": "object", "javaType" : "org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent", + "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"], "properties": { "eventId": { "description": "The unique id identifying the event generated by DMI.", diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java index 8475be6f6a..be70833271 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java @@ -28,6 +28,7 @@ import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; +import org.onap.cps.events.LegacyEvent; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.ssl.SslBundles; @@ -46,13 +47,11 @@ import org.springframework.kafka.support.serializer.JsonSerializer; /** * kafka Configuration for legacy and cloud events. - * - * @param valid legacy event to be sent over the wire. */ @Configuration @EnableKafka @RequiredArgsConstructor -public class KafkaConfig { +public class KafkaConfig { private final KafkaProperties kafkaProperties; @@ -68,12 +67,12 @@ public class KafkaConfig { * @return legacy event producer instance. */ @Bean - public ProducerFactory legacyEventProducerFactory() { + public ProducerFactory legacyEventProducerFactory() { final Map producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL); producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); if (tracingEnabled) { - producerConfigProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); } return new DefaultKafkaProducerFactory<>(producerConfigProperties); } @@ -85,12 +84,12 @@ public class KafkaConfig { * @return an instance of legacy consumer factory. */ @Bean - public ConsumerFactory legacyEventConsumerFactory() { + public ConsumerFactory legacyEventConsumerFactory() { final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL); consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class); if (tracingEnabled) { - consumerConfigProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); } return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); } @@ -102,8 +101,8 @@ public class KafkaConfig { */ @Bean @Primary - public KafkaTemplate legacyEventKafkaTemplate() { - final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); + public KafkaTemplate legacyEventKafkaTemplate() { + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory()); kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory()); if (tracingEnabled) { kafkaTemplate.setObservationEnabled(true); @@ -117,8 +116,9 @@ public class KafkaConfig { * @return instance of Concurrent kafka listener factory */ @Bean - public ConcurrentKafkaListenerContainerFactory legacyEventConcurrentKafkaListenerContainerFactory() { - final ConcurrentKafkaListenerContainerFactory containerFactory = + public ConcurrentKafkaListenerContainerFactory + legacyEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(legacyEventConsumerFactory()); containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10)); @@ -138,8 +138,8 @@ public class KafkaConfig { public ProducerFactory cloudEventProducerFactory() { final Map producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL); if (tracingEnabled) { - producerConfigProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName()); + producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingProducerInterceptor.class.getName()); } return new DefaultKafkaProducerFactory<>(producerConfigProperties); } @@ -154,8 +154,8 @@ public class KafkaConfig { public ConsumerFactory cloudEventConsumerFactory() { final Map consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL); if (tracingEnabled) { - consumerConfigProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName()); + consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, + TracingConsumerInterceptor.class.getName()); } return new DefaultKafkaConsumerFactory<>(consumerConfigProperties); } @@ -168,8 +168,7 @@ public class KafkaConfig { */ @Bean public KafkaTemplate cloudEventKafkaTemplate() { - final KafkaTemplate kafkaTemplate = - new KafkaTemplate<>(cloudEventProducerFactory()); + final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory()); kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory()); if (tracingEnabled) { kafkaTemplate.setObservationEnabled(true); @@ -184,7 +183,7 @@ public class KafkaConfig { */ @Bean public ConcurrentKafkaListenerContainerFactory - cloudEventConcurrentKafkaListenerContainerFactory() { + cloudEventConcurrentKafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory<>(); containerFactory.setConsumerFactory(cloudEventConsumerFactory()); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java index 22f20c8784..9b43837c2c 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java @@ -39,7 +39,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DataOperationEventConsumer { - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; /** * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic' diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java index 2575508807..802e15aaa0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java @@ -38,7 +38,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DmiAsyncRequestResponseEventConsumer { - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper; /** @@ -56,7 +56,7 @@ public class DmiAsyncRequestResponseEventConsumer { log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent = ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent); - eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), + eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java index 6fddd9ed6a..8edd21fefb 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java @@ -141,8 +141,7 @@ public class DmiDataOperationsHelper { if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); - @SuppressWarnings("unchecked") - final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class); + final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class); log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", clientTopic, requestId, dataOperationCloudEvent.getId()); eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent); diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java index cdde02dce0..0eb657388a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java @@ -52,7 +52,7 @@ public class CmAvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; private final CmAvcEventService cmAvcEventService; private final InventoryPersistence inventoryPersistence; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java index 46c9457f10..d1fccdbb12 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java @@ -37,7 +37,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class EventProducer { - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; @Value("${app.ncmp.avc.cm-subscription-dmi-in}") private String dmiInEventTopic; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java index 4ee6e73abc..9e5a97d4c0 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java @@ -49,7 +49,7 @@ public class LcmEventsProducer { private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent"); private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName()); private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A"; - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; private final JsonObjectMapper jsonObjectMapper; private final MeterRegistry meterRegistry; @@ -75,7 +75,7 @@ public class LcmEventsProducer { @SuppressWarnings("unchecked") final Map lcmEventHeadersMap = jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); - eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); + eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); } catch (final KafkaException e) { log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage()); } finally { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java index 8f83e28a7c..7d116cd030 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java @@ -38,7 +38,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class InventoryEventProducer { - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; @Value("${app.ncmp.avc.inventory-events-topic}") private String ncmpInventoryEventsTopicName; diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy index 9e1649ef93..b2e42ced26 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023-2024 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ package org.onap.cps.ncmp.config import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.CloudEventSerializer +import org.onap.cps.events.LegacyEvent import org.spockframework.spring.EnableSharedInjection import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.autoconfigure.kafka.KafkaProperties @@ -43,7 +44,7 @@ class KafkaConfigSpec extends Specification { @Shared @Autowired - KafkaTemplate legacyEventKafkaTemplate + KafkaTemplate legacyEventKafkaTemplate @Shared @Autowired diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index 8ea73b672f..fec4fba89c 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase @SpringBean EventsProducer cpsAsyncRequestResponseEventProducer = - new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate); + new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate); @SpringBean diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy index 9c9768ab1f..7b7faf3d8a 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy @@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class DataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @SpringBean DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy index baca4450dd..602a54e262 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy @@ -61,7 +61,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { then: 'wait a little for async processing of message (must wait to try to avoid false positives)' TimeUnit.MILLISECONDS.sleep(300) and: 'event is not consumed' - 0 * mockEventsProducer.sendEvent(*_) + 0 * mockEventsProducer.sendLegacyEvent(*_) } def 'Legacy event consumer with valid legacy event.'() { @@ -70,7 +70,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the send event call' def sendEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called' - mockEventsProducer.sendEvent(*_) >> { + mockEventsProducer.sendLegacyEvent(*_) >> { sendEventMethodCalled = true } when: 'send the cloud event' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy index 65e8af8e48..b08294583e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy @@ -80,7 +80,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the send event call' def sendEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event' - mockEventsProducer.sendEvent(*_) >> { + mockEventsProducer.sendLegacyEvent(*_) >> { sendEventMethodCalled = true } when: 'send the event' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy index f282b75eb0..a42bf1fbe1 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy @@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) @SpringBean - EventsProducer eventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy index 87e026e08a..5a0980cb02 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) def mockCmAvcEventService = Mock(CmAvcEventService) def mockInventoryPersistence = Mock(InventoryPersistence) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy index 9d2511a996..ed984ec155 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy @@ -48,7 +48,7 @@ class EventsProducerSpec extends MessagingBaseSpec { def testTopic = 'ncmp-events-test' @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) + EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate) @Autowired JsonObjectMapper jsonObjectMapper @@ -85,7 +85,7 @@ class EventsProducerSpec extends MessagingBaseSpec { and: 'consumer has a subscription' legacyEventKafkaConsumer.subscribe([testTopic] as List) when: 'an event is sent' - eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData) + eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData) and: 'topic is polled' def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy index d9944a707a..4bcb89aac3 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy @@ -57,7 +57,7 @@ class LcmEventsProducerSpec extends Specification { when: 'service is called to send lcm event' objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader) then: 'producer is called #expectedTimesMethodCalled times' - expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> { + expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> { args -> { def eventHeaders = (args[2] as Map) assert eventHeaders.containsKey('eventId') @@ -91,7 +91,7 @@ class LcmEventsProducerSpec extends Specification { def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) objectUnderTest.notificationsEnabled = true when: 'producer set to throw an exception' - mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('sending failed')} + mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')} and: 'an event is publised' objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader) then: 'the exception is just logged and not bubbled up' diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java index c59c0e6b19..710bc1fd3f 100644 --- a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java @@ -42,7 +42,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class CpsDataUpdateEventsProducer { - private final EventsProducer eventsProducer; + private final EventsProducer eventsProducer; private final CpsNotificationService cpsNotificationService; diff --git a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java index 77a2cd0ddc..7d28dc63df 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java @@ -40,13 +40,14 @@ import org.springframework.util.SerializationUtils; @Slf4j @Service @RequiredArgsConstructor -public class EventsProducer { +public class EventsProducer { /** * KafkaTemplate for legacy (non-cloud) events. - * Note: Cloud events should be used. This will be addressed as part of .... + * Note: Cloud events should be used. This will be addressed as part of .... */ - private final KafkaTemplate legacyKafkaEventTemplate; + private final KafkaTemplate legacyKafkaEventTemplate; private final KafkaTemplate cloudEventKafkaTemplate; @@ -64,49 +65,52 @@ public class EventsProducer { } /** - * Generic Event sender. - * Note: Cloud events should be used. This will be addressed as part of .... + * Legacy Event sender. Schemas that implement LegacyEvent are eligible to use this method. + * Note: Cloud events should be used. This will be addressed as part of .... * * @param topicName valid topic name * @param eventKey message key * @param event message payload */ - public void sendEvent(final String topicName, final String eventKey, final T event) { - final CompletableFuture> eventFuture = + public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) { + final CompletableFuture> eventFuture = legacyKafkaEventTemplate.send(topicName, eventKey, event); handleLegacyEventCallback(topicName, eventFuture); } /** - * Generic Event sender with headers. + * Legacy Event sender with headers. Schemas that implement LegacyEvent are eligible to use this method. * * @param topicName valid topic name * @param eventKey message key * @param eventHeaders event headers * @param event message payload */ - public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) { - final ProducerRecord producerRecord = + public void sendLegacyEvent(final String topicName, final String eventKey, final Headers eventHeaders, + final LegacyEvent event) { + final ProducerRecord producerRecord = new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders); - final CompletableFuture> eventFuture = legacyKafkaEventTemplate.send(producerRecord); + final CompletableFuture> eventFuture = + legacyKafkaEventTemplate.send(producerRecord); handleLegacyEventCallback(topicName, eventFuture); } /** - * Generic Event sender with headers. + * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method. * * @param topicName valid topic name * @param eventKey message key * @param eventHeaders map of event headers * @param event message payload */ - public void sendEvent(final String topicName, final String eventKey, final Map eventHeaders, - final T event) { - sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); + public void sendLegacyEvent(final String topicName, final String eventKey, final Map eventHeaders, + final LegacyEvent event) { + sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event); } private void handleLegacyEventCallback(final String topicName, - final CompletableFuture> eventFuture) { + final CompletableFuture> eventFuture) { eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e)); } diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy index e36d09387a..bf543787a5 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy @@ -98,10 +98,10 @@ class EventsProducerSpec extends Specification { new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0) ) ) - def someEvent = Mock(Object) - 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture + def someLegacyEvent = Mock(LegacyEvent) + 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture when: 'sending the cloud event' - objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent) + objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent) then: 'the correct debug message is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.DEBUG @@ -117,9 +117,9 @@ class EventsProducerSpec extends Specification { new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0) ) ) - def someEvent = Mock(Object.class) + def someLegacyEvent = Mock(LegacyEvent) when: 'sending the legacy event' - objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) + objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent) then: 'event is sent' 1 * legacyKafkaTemplateMock.send(_) >> eventFuture and: 'the correct debug message is logged' @@ -138,9 +138,9 @@ class EventsProducerSpec extends Specification { new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0) ) ) - def someEvent = Mock(Object.class) + def someLegacyEvent = Mock(LegacyEvent) when: 'sending the legacy event' - objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent) + objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent) then: 'event is sent' 1 * legacyKafkaTemplateMock.send(_) >> eventFuture and: 'the correct debug message is logged'