Refactor EventsProducer class to remove Generic type for legacyevent 18/142118/1
authormpriyank <priyank.maheshwari@est.tech>
Wed, 24 Sep 2025 12:22:21 +0000 (13:22 +0100)
committermpriyank <priyank.maheshwari@est.tech>
Thu, 25 Sep 2025 10:26:59 +0000 (11:26 +0100)
- EventsProducer refactored to not use generic type and instead use
  LegacyEvent type for all the events that still uses non-cloud events.
- Events are implementing LegacyEvent interface now just to
  differentiate b/w cloud and non-cloud events when creating a
  kafkatemplate to send events

Issue-ID: CPS-2990
Change-Id: Ic905ceadefca9e492cb999b633c86d13227c30c2
Signed-off-by: mpriyank <priyank.maheshwari@est.tech>
25 files changed:
cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java [new file with mode: 0644]
cps-ncmp-events/pom.xml
cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-schema-v1.json
cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/data-operation-event-schema-1.0.0.json
cps-ncmp-events/src/main/resources/schemas/ncmp/async-m2m/ncmp-async-request-response-event-schema-v1.json
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/KafkaConfig.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/KafkaConfigSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy
cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java
cps-service/src/main/java/org/onap/cps/events/EventsProducer.java
cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy

diff --git a/cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java b/cps-events/src/main/java/org/onap/cps/events/LegacyEvent.java
new file mode 100644 (file)
index 0000000..26705c2
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ *  ============LICENSE_START=======================================================
+ *  Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ *  SPDX-License-Identifier: Apache-2.0
+ *  ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+/**
+ * Marker interface used to categorize legacy events under a common type hierarchy.
+ */
+public interface LegacyEvent { }
index 997e39f..5ac8a3b 100644 (file)
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.onap.cps</groupId>
+            <artifactId>cps-events</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
index bd0d90d..c79af2e 100644 (file)
@@ -68,6 +68,7 @@
       "description": "The payload for LCM event",
       "type": "object",
       "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEvent",
+      "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
       "properties": {
         "eventId": {
           "description": "The unique id identifying the event",
index c291518..088f2b5 100644 (file)
@@ -7,6 +7,7 @@
       "description": "The payload of data operation event.",
       "type": "object",
       "javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent",
+      "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
       "properties": {
         "data": {
           "description": "The payload content of the requested data.",
index cb10f75..4a00fce 100644 (file)
@@ -7,6 +7,7 @@
       "description": "The payload for CPS async request response event.",
       "type": "object",
       "javaType" : "org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent",
+      "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
       "properties": {
         "eventId": {
           "description": "The unique id identifying the event generated by DMI.",
index 8475be6..be70833 100644 (file)
@@ -28,6 +28,7 @@ import java.util.Map;
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.cps.events.LegacyEvent;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
 import org.springframework.boot.ssl.SslBundles;
@@ -46,13 +47,11 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 
 /**
  * kafka Configuration for legacy and cloud events.
- *
- * @param <T> valid legacy event to be sent over the wire.
  */
 @Configuration
 @EnableKafka
 @RequiredArgsConstructor
-public class KafkaConfig<T> {
+public class KafkaConfig {
 
     private final KafkaProperties kafkaProperties;
 
@@ -68,12 +67,12 @@ public class KafkaConfig<T> {
      * @return legacy event producer instance.
      */
     @Bean
-    public ProducerFactory<String, T> legacyEventProducerFactory() {
+    public ProducerFactory<String, LegacyEvent> legacyEventProducerFactory() {
         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
         producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
         if (tracingEnabled) {
-            producerConfigProperties.put(
-                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+            producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingProducerInterceptor.class.getName());
         }
         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
     }
@@ -85,12 +84,12 @@ public class KafkaConfig<T> {
      * @return an instance of legacy consumer factory.
      */
     @Bean
-    public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+    public ConsumerFactory<String, LegacyEvent> legacyEventConsumerFactory() {
         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
         consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
         if (tracingEnabled) {
-            consumerConfigProperties.put(
-                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+            consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingConsumerInterceptor.class.getName());
         }
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
@@ -102,8 +101,8 @@ public class KafkaConfig<T> {
      */
     @Bean
     @Primary
-    public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
-        final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+    public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
+        final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
         kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
         if (tracingEnabled) {
             kafkaTemplate.setObservationEnabled(true);
@@ -117,8 +116,9 @@ public class KafkaConfig<T> {
      * @return instance of Concurrent kafka listener factory
      */
     @Bean
-    public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
-        final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
+    public ConcurrentKafkaListenerContainerFactory<String, LegacyEvent>
+                            legacyEventConcurrentKafkaListenerContainerFactory() {
+        final ConcurrentKafkaListenerContainerFactory<String, LegacyEvent> containerFactory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         containerFactory.setConsumerFactory(legacyEventConsumerFactory());
         containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
@@ -138,8 +138,8 @@ public class KafkaConfig<T> {
     public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
         final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
         if (tracingEnabled) {
-            producerConfigProperties.put(
-                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+            producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingProducerInterceptor.class.getName());
         }
         return new DefaultKafkaProducerFactory<>(producerConfigProperties);
     }
@@ -154,8 +154,8 @@ public class KafkaConfig<T> {
     public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
         final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
         if (tracingEnabled) {
-            consumerConfigProperties.put(
-                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+            consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    TracingConsumerInterceptor.class.getName());
         }
         return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
     }
@@ -168,8 +168,7 @@ public class KafkaConfig<T> {
      */
     @Bean
     public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
-        final KafkaTemplate<String, CloudEvent> kafkaTemplate =
-            new KafkaTemplate<>(cloudEventProducerFactory());
+        final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
         kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
         if (tracingEnabled) {
             kafkaTemplate.setObservationEnabled(true);
@@ -184,7 +183,7 @@ public class KafkaConfig<T> {
      */
     @Bean
     public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
-                                        cloudEventConcurrentKafkaListenerContainerFactory() {
+                        cloudEventConcurrentKafkaListenerContainerFactory() {
         final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
                 new ConcurrentKafkaListenerContainerFactory<>();
         containerFactory.setConsumerFactory(cloudEventConsumerFactory());
index 22f20c8..9b43837 100644 (file)
@@ -39,7 +39,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DataOperationEventConsumer {
 
-    private final EventsProducer<CloudEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
 
     /**
      * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
index 2575508..802e15a 100644 (file)
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class DmiAsyncRequestResponseEventConsumer {
 
-    private final EventsProducer<NcmpAsyncRequestResponseEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
     private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
 
     /**
@@ -56,7 +56,7 @@ public class DmiAsyncRequestResponseEventConsumer {
         log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
         final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
                 ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
-        eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+        eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
                                      ncmpAsyncRequestResponseEvent.getEventId(),
                                      ncmpAsyncRequestResponseEvent);
     }
index 6fddd9e..8edd21f 100644 (file)
@@ -141,8 +141,7 @@ public class DmiDataOperationsHelper {
         if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
             final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
                     requestId, cmHandleIdsPerResponseCodesPerOperation);
-            @SuppressWarnings("unchecked")
-            final EventsProducer<CloudEvent> eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+            final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
             log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
                     clientTopic, requestId, dataOperationCloudEvent.getId());
             eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
index cdde02d..0eb6573 100644 (file)
@@ -52,7 +52,7 @@ public class CmAvcEventConsumer {
     @Value("${app.ncmp.avc.cm-events-topic}")
     private String cmEventsTopicName;
 
-    private final EventsProducer<CloudEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
     private final CmAvcEventService cmAvcEventService;
     private final InventoryPersistence inventoryPersistence;
 
index 46c9457..d1fccdb 100644 (file)
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class EventProducer {
 
-    private final EventsProducer<CloudEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
 
     @Value("${app.ncmp.avc.cm-subscription-dmi-in}")
     private String dmiInEventTopic;
index 4ee6e73..9e5a97d 100644 (file)
@@ -49,7 +49,7 @@ public class LcmEventsProducer {
     private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent");
     private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
     private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
-    private final EventsProducer<LcmEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
     private final JsonObjectMapper jsonObjectMapper;
     private final MeterRegistry meterRegistry;
 
@@ -75,7 +75,7 @@ public class LcmEventsProducer {
                 @SuppressWarnings("unchecked")
                 final Map<String, Object> lcmEventHeadersMap =
                         jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
-                eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
+                eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
             } catch (final KafkaException e) {
                 log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
             } finally {
index 8f83e28..7d116cd 100644 (file)
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class InventoryEventProducer {
 
-    private final EventsProducer<CloudEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
 
     @Value("${app.ncmp.avc.inventory-events-topic}")
     private String ncmpInventoryEventsTopicName;
index 9e1649e..b2e42ce 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * ============LICENSE_START=======================================================
- *  Copyright (C) 2023-2024 Nordix Foundation
+ *  Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@ package org.onap.cps.ncmp.config
 import io.cloudevents.CloudEvent
 import io.cloudevents.kafka.CloudEventDeserializer
 import io.cloudevents.kafka.CloudEventSerializer
+import org.onap.cps.events.LegacyEvent
 import org.spockframework.spring.EnableSharedInjection
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties
@@ -43,7 +44,7 @@ class KafkaConfigSpec extends Specification {
 
     @Shared
     @Autowired
-    KafkaTemplate<String, String> legacyEventKafkaTemplate
+    KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
 
     @Shared
     @Autowired
index 8ea73b6..fec4fba 100644 (file)
@@ -44,7 +44,7 @@ class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBase
 
     @SpringBean
     EventsProducer cpsAsyncRequestResponseEventProducer =
-        new EventsProducer<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+        new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
 
 
     @SpringBean
index 9c9768a..7b7faf3 100644 (file)
@@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class DataOperationEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer asyncDataOperationEventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @SpringBean
     DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
index baca445..602a54e 100644 (file)
@@ -61,7 +61,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
             TimeUnit.MILLISECONDS.sleep(300)
         and: 'event is not consumed'
-            0 * mockEventsProducer.sendEvent(*_)
+            0 * mockEventsProducer.sendLegacyEvent(*_)
     }
 
     def 'Legacy event consumer with valid legacy event.'() {
@@ -70,7 +70,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the send event call'
             def sendEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called'
-            mockEventsProducer.sendEvent(*_) >> {
+            mockEventsProducer.sendLegacyEvent(*_) >> {
                 sendEventMethodCalled = true
             }
         when: 'send the cloud event'
index 65e8af8..b082945 100644 (file)
@@ -80,7 +80,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec {
         and: 'a flag to track the send event call'
             def sendEventMethodCalled = false
         and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
-            mockEventsProducer.sendEvent(*_) >> {
+            mockEventsProducer.sendLegacyEvent(*_) >> {
                 sendEventMethodCalled = true
             }
         when: 'send the event'
index f282b75..a42bf1f 100644 (file)
@@ -56,7 +56,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
     JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
 
     @SpringBean
-    EventsProducer eventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     def 'Process per data operation request with #serviceName.'() {
         given: 'data operation request with 3 operations'
index 87e026e..5a0980c 100644 (file)
@@ -51,7 +51,7 @@ import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
 class CmAvcEventConsumerSpec extends MessagingBaseSpec {
 
     @SpringBean
-    EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     def mockCmAvcEventService = Mock(CmAvcEventService)
     def mockInventoryPersistence = Mock(InventoryPersistence)
index 9d2511a..ed984ec 100644 (file)
@@ -48,7 +48,7 @@ class EventsProducerSpec extends MessagingBaseSpec {
     def testTopic = 'ncmp-events-test'
 
     @SpringBean
-    EventsProducer<LcmEvent> eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+    EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
 
     @Autowired
     JsonObjectMapper jsonObjectMapper
@@ -85,7 +85,7 @@ class EventsProducerSpec extends MessagingBaseSpec {
         and: 'consumer has a subscription'
             legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
         when: 'an event is sent'
-            eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData)
+            eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
         and: 'topic is polled'
             def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
         then: 'poll returns one record'
index d9944a7..4bcb89a 100644 (file)
@@ -57,7 +57,7 @@ class LcmEventsProducerSpec extends Specification {
         when: 'service is called to send lcm event'
             objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
         then: 'producer is called #expectedTimesMethodCalled times'
-            expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> {
+            expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> {
                 args -> {
                     def eventHeaders = (args[2] as Map<String,Object>)
                     assert eventHeaders.containsKey('eventId')
@@ -91,7 +91,7 @@ class LcmEventsProducerSpec extends Specification {
             def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
             objectUnderTest.notificationsEnabled = true
         when: 'producer set to throw an exception'
-            mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
+            mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
         and: 'an event is publised'
             objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
         then: 'the exception is just logged and not bubbled up'
index c59c0e6..710bc1f 100644 (file)
@@ -42,7 +42,7 @@ import org.springframework.stereotype.Service;
 @RequiredArgsConstructor
 public class CpsDataUpdateEventsProducer {
 
-    private final EventsProducer<CpsDataUpdatedEvent> eventsProducer;
+    private final EventsProducer eventsProducer;
 
     private final CpsNotificationService cpsNotificationService;
 
index 77a2cd0..7d28dc6 100644 (file)
@@ -40,13 +40,14 @@ import org.springframework.util.SerializationUtils;
 @Slf4j
 @Service
 @RequiredArgsConstructor
-public class EventsProducer<T> {
+public class EventsProducer {
 
     /**
      * KafkaTemplate for legacy (non-cloud) events.
-     * Note: Cloud events should be used. This will be addressed as part of  <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+     * Note: Cloud events should be used. This will be addressed as part of  <a
+     * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
      */
-    private final KafkaTemplate<String, T> legacyKafkaEventTemplate;
+    private final KafkaTemplate<String, LegacyEvent> legacyKafkaEventTemplate;
 
     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
 
@@ -64,49 +65,52 @@ public class EventsProducer<T> {
     }
 
     /**
-     * Generic Event sender.
-     * Note: Cloud events should be used. This will be addressed as part of  <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+     * Legacy Event sender. Schemas that implement LegacyEvent are eligible to use this method.
+     * Note: Cloud events should be used. This will be addressed as part of  <a
+     * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
      *
      * @param topicName valid topic name
      * @param eventKey  message key
      * @param event     message payload
      */
-    public void sendEvent(final String topicName, final String eventKey, final T event) {
-        final CompletableFuture<SendResult<String, T>> eventFuture =
+    public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) {
+        final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
                 legacyKafkaEventTemplate.send(topicName, eventKey, event);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
     /**
-     * Generic Event sender with headers.
+     * Legacy Event sender with headers. Schemas that implement LegacyEvent are eligible to use this method.
      *
      * @param topicName    valid topic name
      * @param eventKey     message key
      * @param eventHeaders event headers
      * @param event        message payload
      */
-    public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
-        final ProducerRecord<String, T> producerRecord =
+    public void sendLegacyEvent(final String topicName, final String eventKey, final Headers eventHeaders,
+            final LegacyEvent event) {
+        final ProducerRecord<String, LegacyEvent> producerRecord =
                 new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
-        final CompletableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord);
+        final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
+                legacyKafkaEventTemplate.send(producerRecord);
         handleLegacyEventCallback(topicName, eventFuture);
     }
 
     /**
-     * Generic Event sender with headers.
+     * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
      *
      * @param topicName    valid topic name
      * @param eventKey     message key
      * @param eventHeaders map of event headers
      * @param event        message payload
      */
-    public void sendEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
-                          final T event) {
-        sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+    public void sendLegacyEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+            final LegacyEvent event) {
+        sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
     }
 
     private void handleLegacyEventCallback(final String topicName,
-                                           final CompletableFuture<SendResult<String, T>> eventFuture) {
+            final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
         eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
     }
 
index e36d093..bf54378 100644 (file)
@@ -98,10 +98,10 @@ class EventsProducerSpec extends Specification {
                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
                 )
             )
-            def someEvent = Mock(Object)
-            1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture
+            def someLegacyEvent = Mock(LegacyEvent)
+            1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
         when: 'sending the cloud event'
-            objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent)
+            objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent)
         then: 'the correct debug message is logged'
             def lastLoggingEvent = logger.list[0]
             assert lastLoggingEvent.level == Level.DEBUG
@@ -117,9 +117,9 @@ class EventsProducerSpec extends Specification {
                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
                 )
             )
-            def someEvent = Mock(Object.class)
+            def someLegacyEvent = Mock(LegacyEvent)
         when: 'sending the legacy event'
-            objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+            objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
         then: 'event is sent'
             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
         and: 'the correct debug message is logged'
@@ -138,9 +138,9 @@ class EventsProducerSpec extends Specification {
                     new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
                 )
             )
-            def someEvent = Mock(Object.class)
+            def someLegacyEvent = Mock(LegacyEvent)
         when: 'sending the legacy event'
-            objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+            objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
         then: 'event is sent'
             1 * legacyKafkaTemplateMock.send(_) >> eventFuture
         and: 'the correct debug message is logged'