--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.events;
+
+/**
+ * Marker interface used to categorize legacy events under a common type hierarchy.
+ */
+public interface LegacyEvent { }
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.onap.cps</groupId>
+ <artifactId>cps-events</artifactId>
+ </dependency>
</dependencies>
<build>
"description": "The payload for LCM event",
"type": "object",
"javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEvent",
+ "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
"properties": {
"eventId": {
"description": "The unique id identifying the event",
"description": "The payload of data operation event.",
"type": "object",
"javaType" : "org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent",
+ "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
"properties": {
"data": {
"description": "The payload content of the requested data.",
"description": "The payload for CPS async request response event.",
"type": "object",
"javaType" : "org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent",
+ "javaInterfaces" : ["org.onap.cps.events.LegacyEvent"],
"properties": {
"eventId": {
"description": "The unique id identifying the event generated by DMI.",
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.onap.cps.events.LegacyEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.ssl.SslBundles;
/**
* kafka Configuration for legacy and cloud events.
- *
- * @param <T> valid legacy event to be sent over the wire.
*/
@Configuration
@EnableKafka
@RequiredArgsConstructor
-public class KafkaConfig<T> {
+public class KafkaConfig {
private final KafkaProperties kafkaProperties;
* @return legacy event producer instance.
*/
@Bean
- public ProducerFactory<String, T> legacyEventProducerFactory() {
+ public ProducerFactory<String, LegacyEvent> legacyEventProducerFactory() {
final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
producerConfigProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
if (tracingEnabled) {
- producerConfigProperties.put(
- ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+ producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingProducerInterceptor.class.getName());
}
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
* @return an instance of legacy consumer factory.
*/
@Bean
- public ConsumerFactory<String, T> legacyEventConsumerFactory() {
+ public ConsumerFactory<String, LegacyEvent> legacyEventConsumerFactory() {
final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
consumerConfigProperties.put("spring.deserializer.value.delegate.class", JsonDeserializer.class);
if (tracingEnabled) {
- consumerConfigProperties.put(
- ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+ consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
}
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
*/
@Bean
@Primary
- public KafkaTemplate<String, T> legacyEventKafkaTemplate() {
- final KafkaTemplate<String, T> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
+ public KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate() {
+ final KafkaTemplate<String, LegacyEvent> kafkaTemplate = new KafkaTemplate<>(legacyEventProducerFactory());
kafkaTemplate.setConsumerFactory(legacyEventConsumerFactory());
if (tracingEnabled) {
kafkaTemplate.setObservationEnabled(true);
* @return instance of Concurrent kafka listener factory
*/
@Bean
- public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
- final ConcurrentKafkaListenerContainerFactory<String, T> containerFactory =
+ public ConcurrentKafkaListenerContainerFactory<String, LegacyEvent>
+ legacyEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, LegacyEvent> containerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(legacyEventConsumerFactory());
containerFactory.getContainerProperties().setAuthExceptionRetryInterval(Duration.ofSeconds(10));
public ProducerFactory<String, CloudEvent> cloudEventProducerFactory() {
final Map<String, Object> producerConfigProperties = kafkaProperties.buildProducerProperties(NO_SSL);
if (tracingEnabled) {
- producerConfigProperties.put(
- ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
+ producerConfigProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingProducerInterceptor.class.getName());
}
return new DefaultKafkaProducerFactory<>(producerConfigProperties);
}
public ConsumerFactory<String, CloudEvent> cloudEventConsumerFactory() {
final Map<String, Object> consumerConfigProperties = kafkaProperties.buildConsumerProperties(NO_SSL);
if (tracingEnabled) {
- consumerConfigProperties.put(
- ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
+ consumerConfigProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ TracingConsumerInterceptor.class.getName());
}
return new DefaultKafkaConsumerFactory<>(consumerConfigProperties);
}
*/
@Bean
public KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate() {
- final KafkaTemplate<String, CloudEvent> kafkaTemplate =
- new KafkaTemplate<>(cloudEventProducerFactory());
+ final KafkaTemplate<String, CloudEvent> kafkaTemplate = new KafkaTemplate<>(cloudEventProducerFactory());
kafkaTemplate.setConsumerFactory(cloudEventConsumerFactory());
if (tracingEnabled) {
kafkaTemplate.setObservationEnabled(true);
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
- cloudEventConcurrentKafkaListenerContainerFactory() {
+ cloudEventConcurrentKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(cloudEventConsumerFactory());
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DataOperationEventConsumer {
- private final EventsProducer<CloudEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
/**
* Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DmiAsyncRequestResponseEventConsumer {
- private final EventsProducer<NcmpAsyncRequestResponseEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
/**
log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
- eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+ eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
ncmpAsyncRequestResponseEvent.getEventId(),
ncmpAsyncRequestResponseEvent);
}
if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
- @SuppressWarnings("unchecked")
- final EventsProducer<CloudEvent> eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+ final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
clientTopic, requestId, dataOperationCloudEvent.getId());
eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsProducer<CloudEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
private final CmAvcEventService cmAvcEventService;
private final InventoryPersistence inventoryPersistence;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class EventProducer {
- private final EventsProducer<CloudEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
@Value("${app.ncmp.avc.cm-subscription-dmi-in}")
private String dmiInEventTopic;
private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent");
private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
- private final EventsProducer<LcmEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
private final JsonObjectMapper jsonObjectMapper;
private final MeterRegistry meterRegistry;
@SuppressWarnings("unchecked")
final Map<String, Object> lcmEventHeadersMap =
jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
- eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
+ eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
} catch (final KafkaException e) {
log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
} finally {
@RequiredArgsConstructor
public class InventoryEventProducer {
- private final EventsProducer<CloudEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
@Value("${app.ncmp.avc.inventory-events-topic}")
private String ncmpInventoryEventsTopicName;
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.cloudevents.CloudEvent
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.CloudEventSerializer
+import org.onap.cps.events.LegacyEvent
import org.spockframework.spring.EnableSharedInjection
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
@Shared
@Autowired
- KafkaTemplate<String, String> legacyEventKafkaTemplate
+ KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate
@Shared
@Autowired
@SpringBean
EventsProducer cpsAsyncRequestResponseEventProducer =
- new EventsProducer<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+ new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
@SpringBean
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer asyncDataOperationEventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
and: 'event is not consumed'
- 0 * mockEventsProducer.sendEvent(*_)
+ 0 * mockEventsProducer.sendLegacyEvent(*_)
}
def 'Legacy event consumer with valid legacy event.'() {
and: 'a flag to track the send event call'
def sendEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called'
- mockEventsProducer.sendEvent(*_) >> {
+ mockEventsProducer.sendLegacyEvent(*_) >> {
sendEventMethodCalled = true
}
when: 'send the cloud event'
and: 'a flag to track the send event call'
def sendEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
- mockEventsProducer.sendEvent(*_) >> {
+ mockEventsProducer.sendLegacyEvent(*_) >> {
sendEventMethodCalled = true
}
when: 'send the event'
JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
@SpringBean
- EventsProducer eventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
def mockCmAvcEventService = Mock(CmAvcEventService)
def mockInventoryPersistence = Mock(InventoryPersistence)
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsProducer<LcmEvent> eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
and: 'consumer has a subscription'
legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is sent'
- eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData)
+ eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
when: 'service is called to send lcm event'
objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
then: 'producer is called #expectedTimesMethodCalled times'
- expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> {
+ expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> {
args -> {
def eventHeaders = (args[2] as Map<String,Object>)
assert eventHeaders.containsKey('eventId')
def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
objectUnderTest.notificationsEnabled = true
when: 'producer set to throw an exception'
- mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
+ mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
and: 'an event is publised'
objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
then: 'the exception is just logged and not bubbled up'
@RequiredArgsConstructor
public class CpsDataUpdateEventsProducer {
- private final EventsProducer<CpsDataUpdatedEvent> eventsProducer;
+ private final EventsProducer eventsProducer;
private final CpsNotificationService cpsNotificationService;
@Slf4j
@Service
@RequiredArgsConstructor
-public class EventsProducer<T> {
+public class EventsProducer {
/**
* KafkaTemplate for legacy (non-cloud) events.
- * Note: Cloud events should be used. This will be addressed as part of <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+ * Note: Cloud events should be used. This will be addressed as part of <a
+ * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
*/
- private final KafkaTemplate<String, T> legacyKafkaEventTemplate;
+ private final KafkaTemplate<String, LegacyEvent> legacyKafkaEventTemplate;
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
}
/**
- * Generic Event sender.
- * Note: Cloud events should be used. This will be addressed as part of <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+ * Legacy Event sender. Schemas that implement LegacyEvent are eligible to use this method.
+ * Note: Cloud events should be used. This will be addressed as part of <a
+ * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
*
* @param topicName valid topic name
* @param eventKey message key
* @param event message payload
*/
- public void sendEvent(final String topicName, final String eventKey, final T event) {
- final CompletableFuture<SendResult<String, T>> eventFuture =
+ public void sendLegacyEvent(final String topicName, final String eventKey, final LegacyEvent event) {
+ final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
legacyKafkaEventTemplate.send(topicName, eventKey, event);
handleLegacyEventCallback(topicName, eventFuture);
}
/**
- * Generic Event sender with headers.
+ * Legacy Event sender with headers. Schemas that implement LegacyEvent are eligible to use this method.
*
* @param topicName valid topic name
* @param eventKey message key
* @param eventHeaders event headers
* @param event message payload
*/
- public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
- final ProducerRecord<String, T> producerRecord =
+ public void sendLegacyEvent(final String topicName, final String eventKey, final Headers eventHeaders,
+ final LegacyEvent event) {
+ final ProducerRecord<String, LegacyEvent> producerRecord =
new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
- final CompletableFuture<SendResult<String, T>> eventFuture = legacyKafkaEventTemplate.send(producerRecord);
+ final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture =
+ legacyKafkaEventTemplate.send(producerRecord);
handleLegacyEventCallback(topicName, eventFuture);
}
/**
- * Generic Event sender with headers.
+ * Legacy Event sender with headers in a Map. Schemas that implement LegacyEvent are eligible to use this method.
*
* @param topicName valid topic name
* @param eventKey message key
* @param eventHeaders map of event headers
* @param event message payload
*/
- public void sendEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
- final T event) {
- sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+ public void sendLegacyEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+ final LegacyEvent event) {
+ sendLegacyEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
private void handleLegacyEventCallback(final String topicName,
- final CompletableFuture<SendResult<String, T>> eventFuture) {
+ final CompletableFuture<SendResult<String, LegacyEvent>> eventFuture) {
eventFuture.whenComplete((result, e) -> logOutcome(topicName, result, e));
}
new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
)
)
- def someEvent = Mock(Object)
- 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture
+ def someLegacyEvent = Mock(LegacyEvent)
+ 1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someLegacyEvent) >> eventFuture
when: 'sending the cloud event'
- objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent)
+ objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', someLegacyEvent)
then: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
)
)
- def someEvent = Mock(Object.class)
+ def someLegacyEvent = Mock(LegacyEvent)
when: 'sending the legacy event'
- objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+ objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
then: 'event is sent'
1 * legacyKafkaTemplateMock.send(_) >> eventFuture
and: 'the correct debug message is logged'
new RecordMetadata(new TopicPartition('some-topic', 0), 0, 0, 0, 0, 0)
)
)
- def someEvent = Mock(Object.class)
+ def someLegacyEvent = Mock(LegacyEvent)
when: 'sending the legacy event'
- objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+ objectUnderTest.sendLegacyEvent('some-topic', 'some-event-key', sampleEventHeaders, someLegacyEvent)
then: 'event is sent'
1 * legacyKafkaTemplateMock.send(_) >> eventFuture
and: 'the correct debug message is logged'