* ============LICENSE_START=======================================================
* Copyright (C) 2021 Pantheon.tech
* Modifications Copyright (C) 2021 highstreet technologies GmbH
- * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe
+ * Modifications Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2021-2022 Bell Canada.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
import groovy.json.JsonSlurper
import org.mapstruct.factory.Mappers
import org.onap.cps.TestUtils
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.impl.NetworkCmProxyInventoryFacadeImpl
import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
}
def cleanup() {
- ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
}
def 'Get Resource Data from pass-through operational.'() {
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/**
* kafka Configuration for legacy and cloud events.
*
- * @param <T> valid legacy event to be published over the wire.
+ * @param <T> valid legacy event to be sent over the wire.
*/
@Configuration
@EnableKafka
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2023-2024 Nordix Foundation.
+ * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
/**
* Incoming Cm AvcEvent in the form of Consumer Record, it will be forwarded as is to a target topic.
final CloudEvent outgoingAvcEvent = cmAvcEventAsConsumerRecord.value();
final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
- eventsPublisher.publishCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+ eventsProducer.sendCloudEvent(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.cloudevents.CloudEvent;
import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.DmiInEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
import org.springframework.beans.factory.annotation.Value;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DmiInEventProducer {
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
@Value("${app.ncmp.avc.cm-subscription-dmi-in}")
private String dmiInEventTopic;
/**
- * Publish the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
+ * Send the event to the provided dmi plugin with key as subscription id and the event is in Cloud Event format.
*
* @param subscriptionId Cm Subscription Id
* @param dmiPluginName Dmi Plugin Name
* @param eventType Type of event
* @param dmiInEvent Cm Notification Subscription event for Dmi
*/
- public void publishDmiInEvent(final String subscriptionId, final String dmiPluginName,
- final String eventType, final DmiInEvent dmiInEvent) {
- eventsPublisher.publishCloudEvent(dmiInEventTopic, subscriptionId,
+ public void sendDmiInEvent(final String subscriptionId, final String dmiPluginName,
+ final String eventType, final DmiInEvent dmiInEvent) {
+ eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
buildAndGetDmiInEventAsCloudEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent));
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId);
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi);
- ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
+ ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
}
private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
handleNewCmSubscription(subscriptionId);
scheduleNcmpOutEventResponse(subscriptionId, "subscriptionCreateResponse");
} else {
- rejectAndPublishCreateRequest(subscriptionId, predicates);
+ rejectAndSendCreateRequest(subscriptionId, predicates);
}
}
getLastRemainingAndOverlappingSubscriptionsPerDmi(subscriptionDataNodes);
dmiCacheHandler.add(subscriptionId, mergeDmiCmSubscriptionDetailsPerDmiMaps(dmiCmSubscriptionTuple));
if (dmiCmSubscriptionTuple.lastRemainingSubscriptionsPerDmi().isEmpty()) {
- acceptAndPublishDeleteRequest(subscriptionId);
+ acceptAndSendDeleteRequest(subscriptionId);
} else {
sendSubscriptionDeleteRequestToDmi(subscriptionId,
dmiCmSubscriptionDetailsPerDmiMapper.toDmiCmSubscriptionsPerDmi(
}
private void scheduleNcmpOutEventResponse(final String subscriptionId, final String eventType) {
- ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, null, true);
+ ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, eventType, null, true);
}
- private void rejectAndPublishCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
+ private void rejectAndSendCreateRequest(final String subscriptionId, final List<Predicate> predicates) {
final Set<String> subscriptionTargetFilters =
predicates.stream().flatMap(predicate -> predicate.getTargetFilter().stream())
.collect(Collectors.toSet());
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEventForRejectedRequest(subscriptionId,
new ArrayList<>(subscriptionTargetFilters));
- ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
+ ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionCreateResponse", ncmpOutEvent, false);
}
- private void acceptAndPublishDeleteRequest(final String subscriptionId) {
+ private void acceptAndSendDeleteRequest(final String subscriptionId) {
final Set<String> dmiServiceNames = dmiCacheHandler.get(subscriptionId).keySet();
for (final String dmiServiceName : dmiServiceNames) {
dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiServiceName,
}
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
dmiCacheHandler.get(subscriptionId));
- ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
+ ncmpOutEventProducer.sendNcmpOutEvent(subscriptionId, "subscriptionDeleteResponse", ncmpOutEvent,
false);
}
if (dmiCmSubscriptionPredicates.isEmpty()) {
acceptAndPersistCmSubscriptionPerDmi(subscriptionId, dmiPluginName);
} else {
- publishDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
+ sendDmiInEventPerDmi(subscriptionId, dmiPluginName, dmiCmSubscriptionPredicates);
}
});
}
- private void publishDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
- final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
+ private void sendDmiInEventPerDmi(final String subscriptionId, final String dmiPluginName,
+ final List<DmiCmSubscriptionPredicate> dmiCmSubscriptionPredicates) {
final DmiInEvent dmiInEvent = dmiInEventMapper.toDmiInEvent(dmiCmSubscriptionPredicates);
- dmiInEventProducer.publishDmiInEvent(subscriptionId, dmiPluginName,
+ dmiInEventProducer.sendDmiInEvent(subscriptionId, dmiPluginName,
"subscriptionCreateRequest", dmiInEvent);
}
final DmiInEvent dmiInEvent =
dmiInEventMapper.toDmiInEvent(
dmiCmSubscriptionDetails.getDmiCmSubscriptionPredicates());
- dmiInEventProducer.publishDmiInEvent(subscriptionId,
+ dmiInEventProducer.sendDmiInEvent(subscriptionId,
dmiPluginName, "subscriptionDeleteRequest", dmiInEvent);
});
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
@Value("${ncmp.timers.subscription-forwarding.dmi-response-timeout-ms}")
private Integer dmiOutEventTimeoutInMs;
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiCacheHandler dmiCacheHandler;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
new ConcurrentHashMap<>();
/**
- * Publish the event to the client who requested the subscription with key as subscription id and event is Cloud
+ * Send the event to the client who requested the subscription with key as subscription id and event is Cloud
* Event compliant.
*
* @param subscriptionId Cm Subscription Id
* @param ncmpOutEvent Cm Notification Subscription Event for the
* client
* @param isScheduledEvent Determines if the event is to be scheduled
- * or published now
+ * or send now
*/
- public void publishNcmpOutEvent(final String subscriptionId, final String eventType,
- final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
+ public void sendNcmpOutEvent(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent, final boolean isScheduledEvent) {
final String taskKey = subscriptionId.concat(eventType);
if (isScheduledEvent && !scheduledTasksPerSubscriptionIdAndEventType.containsKey(taskKey)) {
- final ScheduledFuture<?> scheduledFuture = scheduleAndPublishNcmpOutEvent(subscriptionId, eventType);
+ final ScheduledFuture<?> scheduledFuture = scheduleAndSendNcmpOutEvent(subscriptionId, eventType);
scheduledTasksPerSubscriptionIdAndEventType.putIfAbsent(taskKey, scheduledFuture);
log.debug("Scheduled the Cm Subscription Event for subscriptionId : {} and eventType : {}", subscriptionId,
eventType);
} else {
cancelScheduledTask(taskKey);
if (ncmpOutEvent != null) {
- publishNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
- log.debug("Published Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
+ sendNcmpOutEventNow(subscriptionId, eventType, ncmpOutEvent);
+ log.debug("Sent Cm Subscription Event on demand for subscriptionId : {} and eventType : {}",
subscriptionId, eventType);
}
}
.asCloudEvent();
}
- private ScheduledFuture<?> scheduleAndPublishNcmpOutEvent(final String subscriptionId, final String eventType) {
+ private ScheduledFuture<?> scheduleAndSendNcmpOutEvent(final String subscriptionId, final String eventType) {
final NcmpOutEventPublishingTask ncmpOutEventPublishingTask =
- new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsPublisher,
+ new NcmpOutEventPublishingTask(ncmpOutEventTopic, subscriptionId, eventType, eventsProducer,
ncmpOutEventMapper, dmiCacheHandler);
return scheduledExecutorService.schedule(ncmpOutEventPublishingTask, dmiOutEventTimeoutInMs,
TimeUnit.MILLISECONDS);
}
}
- private void publishNcmpOutEventNow(final String subscriptionId, final String eventType,
- final NcmpOutEvent ncmpOutEvent) {
+ private void sendNcmpOutEventNow(final String subscriptionId, final String eventType,
+ final NcmpOutEvent ncmpOutEvent) {
final CloudEvent ncmpOutEventAsCloudEvent =
buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent);
- eventsPublisher.publishCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
+ eventsProducer.sendCloudEvent(ncmpOutEventTopic, subscriptionId, ncmpOutEventAsCloudEvent);
dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 Nordix Foundation
+ * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
private final String topicName;
private final String subscriptionId;
private final String eventType;
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
private final NcmpOutEventMapper ncmpOutEventMapper;
private final DmiCacheHandler dmiCacheHandler;
/**
- * Delegating the responsibility of publishing NcmpOutEvent as a separate task which will
+ * Delegating the responsibility of sending NcmpOutEvent as a separate task which will
* be called after a specified delay.
*/
@Override
dmiCacheHandler.get(subscriptionId);
final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId,
dmiSubscriptionsPerDmi);
- eventsPublisher.publishCloudEvent(topicName, subscriptionId,
+ eventsProducer.sendCloudEvent(topicName, subscriptionId,
buildAndGetNcmpOutEventAsCloudEvent(subscriptionId, eventType, ncmpOutEvent));
dmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId);
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2025 Nordix Foundation
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
cmHandleIdsPerResponseCodesPerOperation.add(dmiDataOperationRequestBody,
Map.of(dmiClientRequestException.getNcmpResponseStatus(), cmHandleIds));
});
- DmiDataOperationsHelper.publishErrorMessageToClientTopic(topicName, requestId,
+ DmiDataOperationsHelper.sendErrorMessageToClientTopic(topicName, requestId,
cmHandleIdsPerResponseCodesPerOperation);
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DataOperationEventConsumer {
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
/**
- * Consume the DataOperation cloud event published by producer to topic 'async-m2m.topic'
- * and publish the same to client specified topic.
+ * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
+ * and send the same to client specified topic.
*
* @param dataOperationEventConsumerRecord consuming event as a ConsumerRecord.
*/
filter = "includeDataOperationEventsOnly",
groupId = "ncmp-data-operation-event-group",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeAndPublish(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
+ public void consumeAndSend(final ConsumerRecord<String, CloudEvent> dataOperationEventConsumerRecord) {
log.debug("Consuming event payload {} ...", dataOperationEventConsumerRecord.value());
final String eventTarget = KafkaHeaders.getParsedKafkaHeader(
dataOperationEventConsumerRecord.headers(), "ce_destination");
final String correlationId = KafkaHeaders.getParsedKafkaHeader(
dataOperationEventConsumerRecord.headers(), "ce_correlationid");
- eventsPublisher.publishCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
+ eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DmiAsyncRequestResponseEventConsumer {
- private final EventsPublisher<NcmpAsyncRequestResponseEvent> eventsPublisher;
+ private final EventsProducer<NcmpAsyncRequestResponseEvent> eventsProducer;
private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
/**
log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
- eventsPublisher.publishEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+ eventsProducer.sendEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
ncmpAsyncRequestResponseEvent.getEventId(),
ncmpAsyncRequestResponseEvent);
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.data.models.DataOperationDefinition;
import org.onap.cps.ncmp.api.data.models.DataOperationRequest;
DmiDataOperation.buildDmiDataOperationRequestBodyWithoutCmHandles(dataOperationDefinitionIn),
CM_HANDLES_NOT_READY, nonReadyCmHandleReferences);
}
- publishErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation);
+ sendErrorMessageToClientTopic(topicParamInQuery, requestId, cmHandleReferencesPerResponseCodesPerOperation);
return dmiDataOperationsOutPerDmiServiceName;
}
}
/**
- * Creates data operation cloud event and publish it to client topic.
+ * Creates data operation cloud event and sends it to client topic.
*
* @param clientTopic client given topic
* @param requestId unique identifier per request
* @param cmHandleIdsPerResponseCodesPerOperation list of cm handle ids per operation with response code
*/
- public static void publishErrorMessageToClientTopic(final String clientTopic,
- final String requestId,
- final MultiValueMap<DmiDataOperation,
+ public static void sendErrorMessageToClientTopic(final String clientTopic,
+ final String requestId,
+ final MultiValueMap<DmiDataOperation,
Map<NcmpResponseStatus, List<String>>>
cmHandleIdsPerResponseCodesPerOperation) {
if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
- final EventsPublisher<CloudEvent> eventsPublisher = CpsApplicationContext.getCpsBean(EventsPublisher.class);
- log.warn("publishing error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
+ final EventsProducer<CloudEvent> eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+ log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
clientTopic, requestId, dataOperationCloudEvent.getId());
- eventsPublisher.publishCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
}
}
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
import org.onap.cps.ncmp.events.lcm.v1.Values;
import org.springframework.stereotype.Service;
/**
- * LcmEventsProducer to call the publisher and publish on the dedicated topic.
+ * LcmEventsProducer to call the producer and send on the dedicated topic.
*/
@Slf4j
private static final Tag TAG_METHOD = Tag.of("method", "publishLcmEvent");
private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
- private final EventsPublisher<LcmEvent> eventsPublisher;
+ private final EventsProducer<LcmEvent> eventsProducer;
private final JsonObjectMapper jsonObjectMapper;
private final MeterRegistry meterRegistry;
try {
final Map<String, Object> lcmEventHeadersMap =
jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
- eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
+ eventsProducer.sendEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
} catch (final KafkaException e) {
log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
} finally {
}
/**
- * Add cmHandles to the cache and publish notification for initial trust level of cmHandles if it is NONE.
+ * Add cmHandles to the cache and send notification for initial trust level of cmHandles if it is NONE.
*
* @param cmHandlesToBeCreated a list of cmHandles being created
*/
}
trustLevelPerCmHandleIdForCache.put(cmHandleId, initialTrustLevel);
if (TrustLevel.NONE.equals(initialTrustLevel)) {
- inventoryEventProducer.publishAvcEvent(cmHandleId,
+ inventoryEventProducer.sendAvcEvent(cmHandleId,
AVC_CHANGED_ATTRIBUTE_NAME,
AVC_NO_OLD_VALUE,
initialTrustLevel.name());
}
/**
- * Updates trust level of dmi plugin in the cache and publish notification for trust level of cmHandles if it
+ * Updates trust level of dmi plugin in the cache and sends notification for trust level of cmHandles if it
* has changed.
*
* @param dmiServiceName dmi service name
}
/**
- * Updates trust level of device in the cache and publish notification for trust level of device if it has
+ * Updates trust level of device in the cache and send notification for trust level of device if it has
* changed.
*
* @param cmHandleId cm handle id
} else {
log.info("The trust level for Cm Handle: {} is now: {} ", notificationCandidateCmHandleId,
newEffectiveTrustLevel);
- inventoryEventProducer.publishAvcEvent(notificationCandidateCmHandleId,
+ inventoryEventProducer.sendAvcEvent(notificationCandidateCmHandleId,
AVC_CHANGED_ATTRIBUTE_NAME,
oldEffectiveTrustLevel.name(),
newEffectiveTrustLevel.name());
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsPublisher;
+import org.onap.cps.events.EventsProducer;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@RequiredArgsConstructor
public class InventoryEventProducer {
- private final EventsPublisher<CloudEvent> eventsPublisher;
+ private final EventsProducer<CloudEvent> eventsProducer;
@Value("${app.ncmp.avc.inventory-events-topic}")
private String ncmpInventoryEventsTopicName;
/**
- * Publish attribute value change event.
+ * Send attribute value change event.
*
* @param eventKey id of the cmHandle being registered
*/
- public void publishAvcEvent(final String eventKey, final String attributeName,
- final String oldAttributeValue, final String newAttributeValue) {
+ public void sendAvcEvent(final String eventKey, final String attributeName,
+ final String oldAttributeValue, final String newAttributeValue) {
final AvcEvent avcEvent = buildAvcEvent(attributeName, oldAttributeValue, newAttributeValue);
final Map<String, String> extensions = createAvcEventExtensions(eventKey);
.build()
.asCloudEvent();
- eventsPublisher.publishCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
+ eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
}
private AvcEvent buildAvcEvent(final String attributeName,
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2023-2024 Nordix Foundation.
+ * Copyright (c) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest(classes = [EventsPublisher, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher eventsPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventsProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsPublisher)
+ CmAvcEventConsumer acvEventConsumer = new CmAvcEventConsumer(eventsProducer)
@Autowired
JsonObjectMapper jsonObjectMapper
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2024-2025 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.CmHandle
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_dmi.Data
@ContextConfiguration(classes = [CpsApplicationContext])
class DmiInEventProducerSpec extends Specification {
- def mockEventsPublisher = Mock(EventsPublisher)
+ def mockEventsProducer = Mock(EventsProducer)
- def objectUnderTest = new DmiInEventProducer(mockEventsPublisher)
+ def objectUnderTest = new DmiInEventProducer(mockEventsProducer)
- def 'Create and Publish Cm Notification Subscription DMI In Event'() {
+ def 'Create and Send Cm Notification Subscription DMI In Event'() {
given: 'a cm subscription for a dmi plugin'
def subscriptionId = 'test-subscription-id'
def dmiPluginName = 'test-dmiplugin'
def dmiInEvent = new DmiInEvent(data: new Data(cmHandles: [new CmHandle(cmhandleId: 'test-1', privateProperties: [:])]))
and: 'also we have target topic for dmiPlugin'
objectUnderTest.dmiInEventTopic = 'dmiplugin-test-topic'
- when: 'the event is published'
- objectUnderTest.publishDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
+ when: 'the event is sent'
+ objectUnderTest.sendDmiInEvent(subscriptionId, dmiPluginName, eventType, dmiInEvent)
then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+ 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
args ->
{
assert args[0] == 'dmiplugin-test-topic'
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
expectedPersistenceCalls * mockDmiCacheHandler.persistIntoDatabasePerDmi('sub-1','test-dmi-plugin-name')
and: 'correct number of calls to map the ncmp out event'
1 * mockNcmpOutEventMapper.toNcmpOutEvent('sub-1', _)
- and: 'correct number of calls to publish the ncmp out event to client'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
+ and: 'correct number of calls to send the ncmp out event to client'
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('sub-1', 'subscriptionCreateResponse', _, false)
where: 'the following parameters are used'
scenario | subscriptionStatus | statusCode || expectedCacheCalls | expectedPersistenceCalls
'Accepted Status' | ACCEPTED | '1' || 1 | 1
/*
* ============LICENSE_START=======================================================
- * Copyright (c) 2024 Nordix Foundation.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
then: 'the subscription cache handler is called once'
1 * mockDmiCacheHandler.add('test-id', _)
- and: 'the events handler method to publish DMI event is called correct number of times with the correct parameters'
- testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.publishDmiInEvent(
+ and: 'the events handler method to send DMI event is called correct number of times with the correct parameters'
+ testDmiSubscriptionsPerDmi.size() * mockDmiInEventProducer.sendDmiInEvent(
"test-id", "dmi-1", "subscriptionCreateRequest", testDmiInEvent)
and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
}
def 'Consume valid and Overlapping Cm Notification Subscription NcmpIn Event'() {
and: 'the subscription details are updated in the cache'
1 * mockDmiCacheHandler.updateDmiSubscriptionStatus('test-id', _, ACCEPTED)
and: 'we schedule to send the response after configured time from the cache'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', null, true)
}
def 'Consume valid and but non-unique CmNotificationSubscription create message'() {
"test-id", _) >> testNcmpOutEvent
when: 'the valid but non-unique event is consumed'
objectUnderTest.processSubscriptionCreateRequest(subscriptionId, predicates)
- then: 'the events handler method to publish DMI event is never called'
- 0 * mockDmiInEventProducer.publishDmiInEvent(_, _, _, _)
- and: 'the events handler method to publish NCMP out event is called once'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
+ then: 'the events handler method to send DMI event is never called'
+ 0 * mockDmiInEventProducer.sendDmiInEvent(_, _, _, _)
+ and: 'the events handler method to send NCMP out event is called once'
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent('test-id', 'subscriptionCreateResponse', testNcmpOutEvent, false)
}
def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message'() {
1 * mockInventoryPersistence.getYangModelCmHandle('ch-2') >> new YangModelCmHandle(dmiServiceName: 'dmi-2')
when: 'the subscription delete request is processed'
objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
- then: 'the method to publish a dmi event is called with correct parameters'
- 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
- 1 * mockDmiInEventProducer.publishDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
- and: 'the method to publish nmcp out event is called with correct parameters'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
+ then: 'the method to send a dmi event is called with correct parameters'
+ 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-1','subscriptionDeleteRequest',_)
+ 1 * mockDmiInEventProducer.sendDmiInEvent(subscriptionId,'dmi-2','subscriptionDeleteRequest',_)
+ and: 'the method to send nmcp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, true)
}
def 'Delete a subscriber for fully overlapping subscriptions'() {
2 * mockDmiCacheHandler.get(subscriptionId) >> ['dmi-1':[:],'dmi-2':[:]]
when: 'the subscription delete request is processed'
objectUnderTest.processSubscriptionDeleteRequest(subscriptionId)
- then: 'the method to publish a dmi event is never called'
- 0 * mockDmiInEventProducer.publishDmiInEvent(_,_,_,_)
+ then: 'the method to send a dmi event is never called'
+ 0 * mockDmiInEventProducer.sendDmiInEvent(_,_,_,_)
and: 'the cache handler is called to remove subscriber from database per dmi'
1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-1')
1 * mockDmiCacheHandler.removeFromDatabase('test-id', 'dmi-2')
- and: 'the method to publish nmcp out event is called with correct parameters'
- 1 * mockNcmpOutEventProducer.publishNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
+ and: 'the method to send ncmp out event is called with correct parameters'
+ 1 * mockNcmpOutEventProducer.sendNcmpOutEvent(subscriptionId, 'subscriptionDeleteResponse', null, false)
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2025 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler
import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.Data
@ContextConfiguration(classes = [CpsApplicationContext])
class NcmpOutEventProducerSpec extends Specification {
- def mockEventsPublisher = Mock(EventsPublisher)
+ def mockEventsProducer = Mock(EventsProducer)
def mockNcmpOutEventMapper = Mock(NcmpOutEventMapper)
def mockDmiCacheHandler = Mock(DmiCacheHandler)
- def objectUnderTest = new NcmpOutEventProducer(mockEventsPublisher, mockNcmpOutEventMapper, mockDmiCacheHandler)
+ def objectUnderTest = new NcmpOutEventProducer(mockEventsProducer, mockNcmpOutEventMapper, mockDmiCacheHandler)
def 'Create and #scenario Cm Notification Subscription NCMP out event'() {
given: 'a cm subscription response for the client'
def subscriptionId = 'test-subscription-id-2'
def eventType = 'subscriptionCreateResponse'
def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-2', acceptedTargets: ['ch-1', 'ch-2']))
- and: 'also we have target topic for publishing to client'
+ and: 'also we have target topic for sending to client'
objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
and: 'a deadline to an event'
objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is published'
- objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
+ when: 'the event is sent'
+ objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, eventPublishingTaskToBeScheduled)
then: 'we conditionally wait for a while'
Thread.sleep(delayInMs)
then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+ 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
args ->
{
assert args[0] == 'client-test-topic'
}
where: 'following scenarios are considered'
scenario | delayInMs | eventPublishingTaskToBeScheduled
- 'publish event now' | 0 | false
- 'schedule and publish after the configured time ' | 1500 | true
+ 'send event now' | 0 | false
+ 'schedule and send after the configured time ' | 1500 | true
}
- def 'Schedule Cm Notification Subscription NCMP out event but later publish it on demand'() {
+ def 'Schedule Cm Notification Subscription NCMP out event but later send it on demand'() {
given: 'a cm subscription response for the client'
def subscriptionId = 'test-subscription-id-3'
def eventType = 'subscriptionCreateResponse'
def ncmpOutEvent = new NcmpOutEvent(data: new Data(subscriptionId: 'test-subscription-id-3', acceptedTargets: ['ch-2', 'ch-3']))
- and: 'also we have target topic for publishing to client'
+ and: 'also we have target topic for sending to client'
objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
and: 'a deadline to an event'
objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is scheduled to be published'
- objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+ when: 'the event is scheduled to be sent'
+ objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
then: 'we wait for 10ms and then we receive response from DMI'
Thread.sleep(10)
- and: 'we receive response from DMI so we publish the message on demand'
- objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+ and: 'we receive response from DMI so we send the message on demand'
+ objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent(_, _, _) >> {
+ 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
args ->
{
assert args[0] == 'client-test-topic'
1 * mockDmiCacheHandler.removeAcceptedAndRejectedDmiSubscriptionEntries(subscriptionId)
}
- def 'No event published when NCMP out event is null'() {
+ def 'No event sent when NCMP out event is null'() {
given: 'a cm subscription response for the client'
def subscriptionId = 'test-subscription-id-3'
def eventType = 'subscriptionCreateResponse'
def ncmpOutEvent = null
- and: 'also we have target topic for publishing to client'
+ and: 'also we have target topic for sending to client'
objectUnderTest.ncmpOutEventTopic = 'client-test-topic'
and: 'a deadline to an event'
objectUnderTest.dmiOutEventTimeoutInMs = 1000
- when: 'the event is scheduled to be published'
- objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
+ when: 'the event is scheduled to be sent'
+ objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, true)
then: 'we wait for 10ms and then we receive response from DMI'
Thread.sleep(10)
- and: 'we receive NO response from DMI so we publish the message on demand'
- objectUnderTest.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
- and: 'no event published'
- 0 * mockEventsPublisher.publishCloudEvent(*_)
+ and: 'we receive NO response from DMI so we send the message on demand'
+ objectUnderTest.sendNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false)
+ and: 'no event sent'
+ 0 * mockEventsProducer.sendCloudEvent(*_)
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2021-2025 Nordix Foundation
+ * Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
* Modifications Copyright (C) 2022 Bell Canada
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
package org.onap.cps.ncmp.impl.data
import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.api.data.models.CmResourceAddress
import org.onap.cps.ncmp.api.data.models.DataOperationRequest
import org.onap.cps.ncmp.api.exceptions.DmiClientRequestException
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
@SpringBootTest
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor])
+@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiProperties, DmiDataOperations, PolicyExecutor])
class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def NO_TOPIC = null
DmiDataOperations objectUnderTest
@SpringBean
- EventsPublisher eventsPublisher = Stub()
+ EventsProducer eventsProducer = Stub()
@SpringBean
PolicyExecutor policyExecutor = Mock()
def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId]
- and: 'the published cloud event will be captured'
+ and: 'the sent cloud event will be captured'
def actualDataOperationCloudEvent = null
- eventsPublisher.publishCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
+ eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
and: 'a DMI client request exception is thrown when DMI service is called'
mockDmiRestClient.asynchronousPostOperationWithJsonData(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
when: 'attempt to get resource data for group of cm handles is invoked'
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.testcontainers.spock.Testcontainers
import java.time.Duration
-@SpringBootTest(classes = [EventsPublisher, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher cpsAsyncRequestResponseEventPublisher =
- new EventsPublisher<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
+ EventsProducer cpsAsyncRequestResponseEventProducer =
+ new EventsProducer<NcmpAsyncRequestResponseEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate);
@SpringBean
@SpringBean
DmiAsyncRequestResponseEventConsumer dmiAsyncRequestResponseEventConsumer =
- new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventPublisher,
+ new DmiAsyncRequestResponseEventConsumer(cpsAsyncRequestResponseEventProducer,
ncmpAsyncRequestResponseEventMapper)
@Autowired
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeaders
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest(classes = [EventsPublisher, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
+@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
@Testcontainers
@DirtiesContext
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsPublisher asyncDataOperationEventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer asyncDataOperationEventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@SpringBean
- DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventPublisher)
+ DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
@Autowired
JsonObjectMapper jsonObjectMapper
def static clientTopic = 'client-topic'
def static dataOperationType = 'org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent'
- def 'Consume and publish event to client specified topic'() {
+ def 'Consume and send event to client specified topic'() {
given: 'consumer subscribing to client topic'
cloudEventKafkaConsumer.subscribe([clientTopic])
and: 'consumer record for data operation event'
def consumerRecordIn = createConsumerRecord(dataOperationType)
- when: 'the data operation event is consumed and published to client specified topic'
- objectUnderTest.consumeAndPublish(consumerRecordIn)
+ when: 'the data operation event is consumed and sent to client specified topic'
+ objectUnderTest.consumeAndSend(consumerRecordIn)
and: 'the client specified topic is polled'
def consumerRecordOut = cloudEventKafkaConsumer.poll(Duration.ofMillis(1500))[0]
then: 'verify cloud compliant headers'
assert KafkaHeaders.getParsedKafkaHeader(consumerRecordOutHeaders, 'ce_destination') == clientTopic
and: 'map consumer record to expected event type'
def dataOperationResponseEvent = toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
- and: 'verify published response data properties'
+ and: 'verify sent response data properties'
def response = dataOperationResponseEvent.data.responses[0]
response.operationId == 'some-operation-id'
response.statusCode == 'any-success-status-code'
package org.onap.cps.ncmp.impl.data.async
import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.KafkaConfig
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec
class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
@SpringBean
- EventsPublisher mockEventsPublisher = Mock()
+ EventsProducer mockEventsProducer = Mock()
@SpringBean
NcmpAsyncRequestResponseEventMapper mapper = Stub()
then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
and: 'event is not consumed'
- 0 * mockEventsPublisher.publishEvent(*_)
+ 0 * mockEventsProducer.sendEvent(*_)
}
def 'Legacy event consumer with valid legacy event.'() {
given: 'a legacy event'
DmiAsyncRequestResponseEvent legacyEvent = new DmiAsyncRequestResponseEvent(eventId:'legacyEventId', eventTarget:'legacyEventTarget')
- and: 'a flag to track the publish event call'
- def publishEventMethodCalled = false
- and: 'the (mocked) events publisher will use the flag to indicate if it is called'
- mockEventsPublisher.publishEvent(*_) >> {
- publishEventMethodCalled = true
+ and: 'a flag to track the send event call'
+ def sendEventMethodCalled = false
+ and: 'the (mocked) events producer will use the flag to indicate if it is called'
+ mockEventsProducer.sendEvent(*_) >> {
+ sendEventMethodCalled = true
}
when: 'send the cloud event'
legacyEventKafkaTemplate.send(topic, legacyEvent)
then: 'the event is consumed by the (legacy) AsynRestRequest consumer'
new PollingConditions().within(1) {
- assert publishEventMethodCalled == true
+ assert sendEventMethodCalled == true
}
}
.withType(eventType)
.withSource(URI.create('some-source'))
.build()
- and: 'a flag to track the publish event call'
- def publishEventMethodCalled = false
- and: 'the (mocked) events publisher will use the flag to indicate if it is called'
- mockEventsPublisher.publishCloudEvent(*_) >> {
- publishEventMethodCalled = true
+ and: 'a flag to track the sent event call'
+ def sendEventMethodCalled = false
+ and: 'the (mocked) events producer will use the flag to indicate if it is called'
+ mockEventsProducer.sendCloudEvent(*_) >> {
+ sendEventMethodCalled = true
}
when: 'send the cloud event'
cloudEventKafkaTemplate.send(topic, cloudEvent)
then: 'the event has only been forwarded for the correct type'
new PollingConditions(initialDelay: 0.3).within(1) {
- assert publishEventMethodCalled == expectCallToPublishEventMethod
+ assert sendEventMethodCalled == expectCallToSendEventMethod
}
where: 'the following event types are used'
- eventType || expectCallToPublishEventMethod
+ eventType || expectCallToSendEventMethod
'DataOperationEvent' || true
'other type' || false
'any type contain the word "DataOperationEvent"' || true
then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
and: 'the event is not processed by this consumer'
- 0 * mockEventsPublisher.publishCloudEvent(*_)
+ 0 * mockEventsProducer.sendCloudEvent(*_)
}
}
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.KafkaConfig
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
class SerializationIntegrationSpec extends ConsumerBaseSpec {
@SpringBean
- EventsPublisher mockEventsPublisher = Mock()
+ EventsProducer mockEventsProducer = Mock()
@SpringBean
NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')}
def 'Forwarding DataOperation Event Data.'() {
given: 'a data operation cloud event'
def cloudEvent = createCloudEvent()
- and: 'a flag to track the publish cloud event call'
- def publishCloudEventMethodCalled = false
- and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the cloud event'
- mockEventsPublisher.publishCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
- publishCloudEventMethodCalled = true
+ and: 'a flag to track the send cloud event call'
+ def sendCloudEventMethodCalled = false
+ and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event'
+ mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
+ sendCloudEventMethodCalled = true
}
when: 'send the event'
cloudEventKafkaTemplate.send(topic, cloudEvent)
then: 'the event has been forwarded'
new PollingConditions().within(1) {
- assert publishCloudEventMethodCalled == true
+ assert sendCloudEventMethodCalled == true
}
}
def 'Forwarding AsyncRestRequestResponse Event Data.'() {
given: 'async request response legacy event'
def dmiAsyncRequestResponseEvent = new DmiAsyncRequestResponseEvent(eventId: 'my-event-id',eventTarget: 'some client topic')
- and: 'a flag to track the publish event call'
- def publishEventMethodCalled = false
- and: 'the (mocked) events publisher will use the flag to indicate if it is called and will capture the event'
- mockEventsPublisher.publishEvent(*_) >> {
- publishEventMethodCalled = true
+ and: 'a flag to track the send event call'
+ def sendEventMethodCalled = false
+ and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
+ mockEventsProducer.sendEvent(*_) >> {
+ sendEventMethodCalled = true
}
when: 'send the event'
legacyEventKafkaTemplate.send(topic, dmiAsyncRequestResponseEvent)
then: 'the event has been forwarded'
new PollingConditions().within(1) {
- assert publishEventMethodCalled == true
+ assert sendEventMethodCalled == true
}
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023-2024 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.api.data.models.DataOperationRequest
import org.onap.cps.ncmp.api.data.models.OperationType
import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@ContextConfiguration(classes = [EventsPublisher, CpsApplicationContext])
+@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext])
class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
def static clientTopic = 'my-topic-name'
JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
@SpringBean
- EventsPublisher eventPublisher = new EventsPublisher<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer eventProducer = new EventsProducer<CloudEvent>(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
assert cmHandlesInRequestBody[0].moduleSetTag == 'module-set-tag1'
}
- def 'Process per data operation request with non-ready, non-existing cm handle and publish event to client specified topic'() {
+ def 'Process per data operation request with non-ready, non-existing cm handle and send event to client specified topic'() {
given: 'consumer subscribing to client topic'
def cloudEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('test-1', CloudEventDeserializer))
cloudEventKafkaConsumer.subscribe([clientTopic])
toTargetEvent(consumerRecordOut.value(), DataOperationEvent.class)
and: 'data operation response event response size is 3'
dataOperationResponseEvent.data.responses.size() == 3
- and: 'verify published data operation response as json string'
+ and: 'verify sent data operation response as json string'
def dataOperationResponseEventJson = TestUtils.getResourceFileContent('dataOperationResponseEvent.json')
jsonObjectMapper.asJsonString(dataOperationResponseEvent.data.responses) == dataOperationResponseEventJson
}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.events.lcm.v1.Event
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.utils.TestUtils
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
-class LcmEventsPublisherSpec extends MessagingBaseSpec {
+class EventsProducerSpec extends MessagingBaseSpec {
def legacyEventKafkaConsumer = new KafkaConsumer<>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsPublisher<LcmEvent> lcmEventsPublisher = new EventsPublisher(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
+ EventsProducer<LcmEvent> eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate)
@Autowired
JsonObjectMapper jsonObjectMapper
- def 'Produce and Consume Lcm Event'() {
+ def 'Produce and Consume Event'() {
given: 'event key and event data'
def eventKey = 'lcm'
def eventId = 'test-uuid'
eventSchemaVersion: eventSchemaVersion]
and: 'consumer has a subscription'
legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
- when: 'an event is published'
- lcmEventsPublisher.publishEvent(testTopic, eventKey, eventHeader, eventData)
+ when: 'an event is sent'
+ eventsProducer.sendEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.events.lcm.v1.Event
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
class LcmEventsProducerSpec extends Specification {
- def mockLcmEventsPublisher = Mock(EventsPublisher)
+ def mockLcmEventsProducer = Mock(EventsProducer)
def mockJsonObjectMapper = Mock(JsonObjectMapper)
def meterRegistry = new SimpleMeterRegistry()
- def objectUnderTest = new LcmEventsProducer(mockLcmEventsPublisher, mockJsonObjectMapper, meterRegistry)
+ def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry)
- def 'Create and Publish lcm event where events are #scenario'() {
+ def 'Create and send lcm event where events are #scenario'() {
given: 'a cm handle id, Lcm Event, and headers'
def cmHandleId = 'test-cm-handle-id'
def eventId = UUID.randomUUID().toString()
objectUnderTest.notificationsEnabled = notificationsEnabled
and: 'lcm event header is transformed to headers map'
mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
- when: 'service is called to publish lcm event'
+ when: 'service is called to send lcm event'
objectUnderTest.publishLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
then: 'publisher is called #expectedTimesMethodCalled times'
- expectedTimesMethodCalled * mockLcmEventsPublisher.publishEvent(_, cmHandleId, _, lcmEvent) >> {
+ expectedTimesMethodCalled * mockLcmEventsProducer.sendEvent(_, cmHandleId, _, lcmEvent) >> {
args -> {
def eventHeaders = (args[2] as Map<String,Object>)
assert eventHeaders.containsKey('eventId')
def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
objectUnderTest.notificationsEnabled = true
when: 'publisher set to throw an exception'
- mockLcmEventsPublisher.publishEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
+ mockLcmEventsProducer.sendEvent(_, _, _, _) >> { throw new KafkaException('publishing failed')}
and: 'an event is publised'
objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
then: 'the exception is just logged and not bubbled up'
when: 'method to register to the cache is called'
objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
then: 'no notification sent'
- 0 * mockInventoryEventProducer.publishAvcEvent(*_)
+ 0 * mockInventoryEventProducer.sendAvcEvent(*_)
and: 'both cm handles are in the cache and are trusted'
assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.COMPLETE
assert trustLevelPerCmHandleId.get('ch-2') == TrustLevel.COMPLETE
when: 'method to register to the cache is called'
objectUnderTest.registerCmHandles(cmHandleModelsToBeCreated)
then: 'notification is sent'
- 1 * mockInventoryEventProducer.publishAvcEvent(*_)
+ 1 * mockInventoryEventProducer.sendAvcEvent(*_)
}
def 'Dmi trust level updated'() {
when: 'the update is handled'
objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.NONE)
then: 'notification is sent'
- 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
+ 1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'COMPLETE', 'NONE')
and: 'the dmi in the cache is not trusted'
assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.NONE
}
when: 'the update is handled'
objectUnderTest.updateDmi('my-dmi', ['ch-1'], TrustLevel.COMPLETE)
then: 'no notification is sent'
- 0 * mockInventoryEventProducer.publishAvcEvent(*_)
+ 0 * mockInventoryEventProducer.sendAvcEvent(*_)
and: 'the dmi in the cache is trusted'
assert trustLevelPerDmiPlugin.get('my-dmi') == TrustLevel.COMPLETE
}
then: 'the cm handle in the cache is trusted'
assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.COMPLETE)
and: 'notification is sent'
- 1 * mockInventoryEventProducer.publishAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
+ 1 * mockInventoryEventProducer.sendAvcEvent('ch-1', 'trustLevel', 'NONE', 'COMPLETE')
}
def 'CmHandle trust level updated with same value'() {
then: 'the cm handle in the cache is not trusted'
assert trustLevelPerCmHandleId.get('ch-1', TrustLevel.NONE)
and: 'no notification is sent'
- 0 * mockInventoryEventProducer.publishAvcEvent(*_)
+ 0 * mockInventoryEventProducer.sendAvcEvent(*_)
}
def 'Dmi trust level restored to complete with non trusted CmHandle'() {
then: 'the cm handle in the cache is still NONE'
assert trustLevelPerCmHandleId.get('ch-1') == TrustLevel.NONE
and: 'no notification is sent'
- 0 * mockInventoryEventProducer.publishAvcEvent(*_)
+ 0 * mockInventoryEventProducer.sendAvcEvent(*_)
}
def 'Apply effective trust level among CmHandle and dmi plugin'() {
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
-import org.onap.cps.events.EventsPublisher
+import org.onap.cps.events.EventsProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc
import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent
@ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
class InventoryEventProducerSpec extends MessagingBaseSpec {
- def mockEventsPublisher = Mock(EventsPublisher<CloudEvent>)
- def objectUnderTest = new InventoryEventProducer(mockEventsPublisher)
+ def mockEventsProducer = Mock(EventsProducer<CloudEvent>)
+ def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
- def 'Publish an attribute value change event'() {
+ def 'Send an attribute value change event'() {
given: 'the event key'
def someEventKey = 'someEventKey'
and: 'the name of the attribute being changed'
def someOldAttributeValue = 'someOldAttributeValue'
and: 'the new value of the attribute'
def someNewAttributeValue = 'someNewAttributeValue'
- when: 'an attribute value change event is published'
- objectUnderTest.publishAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
- then: 'the cloud event publisher is invoked with the correct data'
- 1 * mockEventsPublisher.publishCloudEvent(_, someEventKey,
+ when: 'an attribute value change event is sent'
+ objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
+ then: 'the cloud event producer is invoked with the correct data'
+ 1 * mockEventsProducer.sendCloudEvent(_, someEventKey,
cloudEvent -> {
def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange
def expectedAvc = new Avc(attributeName: someAttributeName,
@RequiredArgsConstructor
public class CpsDataUpdateEventsProducer {
- private final EventsPublisher<CpsDataUpdatedEvent> eventsPublisher;
+ private final EventsProducer<CpsDataUpdatedEvent> eventsProducer;
private final CpsNotificationService cpsNotificationService;
private boolean notificationsEnabled;
/**
- * Publish the cps data update event with header to the public topic.
+ * Send the cps data update event with header to the public topic.
*
* @param anchor Anchor of the updated data
* @param xpath xpath of the updated data
* @param operation operation performed on the data
* @param observedTimestamp timestamp when data was updated.
*/
- @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to publish Data Update event")
- public void publishCpsDataUpdateEvent(final Anchor anchor, final String xpath,
- final Operation operation, final OffsetDateTime observedTimestamp) {
+ @Timed(value = "cps.dataupdate.events.publish", description = "Time taken to send Data Update event")
+ public void sendCpsDataUpdateEvent(final Anchor anchor, final String xpath,
+ final Operation operation, final OffsetDateTime observedTimestamp) {
if (notificationsEnabled && cpsChangeEventNotificationsEnabled && isNotificationEnabledForAnchor(anchor)) {
final CpsDataUpdatedEvent cpsDataUpdatedEvent = createCpsDataUpdatedEvent(anchor,
observedTimestamp, xpath, operation);
final CloudEvent cpsDataUpdatedEventAsCloudEvent =
CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
.extensions(extensions).build().asCloudEvent();
- eventsPublisher.publishCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+ eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
} else {
log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}",
notificationsEnabled, cpsChangeEventNotificationsEnabled);
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2024 Nordix Foundation
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.springframework.util.SerializationUtils;
/**
- * EventsPublisher to publish events.
+ * EventsProducer to send events.
*/
@Slf4j
@Service
@RequiredArgsConstructor
-public class EventsPublisher<T> {
+public class EventsProducer<T> {
/**
* KafkaTemplate for legacy (non-cloud) events.
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
/**
- * Generic CloudEvent publisher.
+ * Generic CloudEvent sender.
*
* @param topicName valid topic name
* @param eventKey message key
* @param event message payload
*/
- public void publishCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
+ public void sendCloudEvent(final String topicName, final String eventKey, final CloudEvent event) {
final CompletableFuture<SendResult<String, CloudEvent>> eventFuture =
cloudEventKafkaTemplate.send(topicName, eventKey, event);
eventFuture.whenComplete((result, e) -> {
if (e == null) {
- log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+ log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
result.getProducerRecord().value());
} else {
- log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+ log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
}
});
}
/**
- * Generic Event publisher.
+ * Generic Event sender.
* Note: Cloud events should be used. This will be addressed as part of https://lf-onap.atlassian.net/browse/CPS-1717.
*
* @param topicName valid topic name
* @param eventKey message key
* @param event message payload
*/
- public void publishEvent(final String topicName, final String eventKey, final T event) {
+ public void sendEvent(final String topicName, final String eventKey, final T event) {
final CompletableFuture<SendResult<String, T>> eventFuture =
legacyKafkaEventTemplate.send(topicName, eventKey, event);
handleLegacyEventCallback(topicName, eventFuture);
}
/**
- * Generic Event Publisher with headers.
+ * Generic Event sender with headers.
*
* @param topicName valid topic name
* @param eventKey message key
* @param eventHeaders event headers
* @param event message payload
*/
- public void publishEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
+ public void sendEvent(final String topicName, final String eventKey, final Headers eventHeaders, final T event) {
final ProducerRecord<String, T> producerRecord =
new ProducerRecord<>(topicName, null, eventKey, event, eventHeaders);
}
/**
- * Generic Event Publisher with headers.
+ * Generic Event sender with headers.
*
* @param topicName valid topic name
* @param eventKey message key
* @param eventHeaders map of event headers
* @param event message payload
*/
- public void publishEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
- final T event) {
+ public void sendEvent(final String topicName, final String eventKey, final Map<String, Object> eventHeaders,
+ final T event) {
- publishEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
+ sendEvent(topicName, eventKey, convertToKafkaHeaders(eventHeaders), event);
}
private void handleLegacyEventCallback(final String topicName,
final CompletableFuture<SendResult<String, T>> eventFuture) {
eventFuture.whenComplete((result, e) -> {
if (e == null) {
- log.debug("Successfully published event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
+ log.debug("Successfully sent event to topic : {} , Event : {}", result.getRecordMetadata().topic(),
result.getProducerRecord().value());
} else {
- log.error("Unable to publish event to topic : {} due to {}", topicName, e.getMessage());
+ log.error("Unable to send event to topic : {} due to {}", topicName, e.getMessage());
}
});
}
final Operation operation,
final OffsetDateTime observedTimestamp) {
try {
- cpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
+ cpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp);
} catch (final Exception exception) {
log.error("Failed to send message to notification service", exception);
}
@ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
class CpsDataUpdateEventsProducerSpec extends Specification {
- def mockEventsPublisher = Mock(EventsPublisher)
+ def mockEventsProducer = Mock(EventsProducer)
def objectMapper = new ObjectMapper();
def mockCpsNotificationService = Mock(CpsNotificationService)
- def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsPublisher, mockCpsNotificationService)
+ def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService)
def setup() {
mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true
objectUnderTest.topicName = 'cps-core-event'
}
- def 'Create and Publish cps update event where events are #scenario.'() {
+ def 'Create and send cps update event where events are #scenario.'() {
given: 'an anchor, operation and observed timestamp'
def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
def operation = operationInRequest
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is also true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
- when: 'service is called to publish data update event'
- objectUnderTest.publishCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
+ when: 'service is called to send data update event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, operation, observedTimestamp)
then: 'the event contains the required attributes'
- 1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+ 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
args ->
{
def cpsDataUpdatedEvent = (args[2] as CloudEvent)
'non root node xpath and delete operation' | '/test/path' | DELETE || UPDATE
}
- def 'Publish cps update event when no timestamp provided.'() {
+ def 'Send cps update event when no timestamp provided.'() {
given: 'an anchor, operation and null timestamp'
def anchor = new Anchor('anchor01', 'dataspace01', 'schema01');
def observedTimestamp = null
objectUnderTest.notificationsEnabled = true
and: 'cpsChangeEventNotificationsEnabled is true'
objectUnderTest.cpsChangeEventNotificationsEnabled = true
- when: 'service is called to publish data update event'
- objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp)
- then: 'the event is published'
- 1 * mockEventsPublisher.publishCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
+ when: 'service is called to send data update event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, observedTimestamp)
+ then: 'the event is sent'
+ 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
}
- def 'Enabling and disabling publish cps update events.'() {
+ def 'Enabling and disabling sending cps update events.'() {
given: 'a different anchor'
def anchor = new Anchor('anchor02', 'some dataspace', 'some schema');
and: 'notificationsEnabled is #notificationsEnabled'
objectUnderTest.cpsChangeEventNotificationsEnabled = cpsChangeEventNotificationsEnabled
and: 'notification service enabled is: #cpsNotificationServiceisNotificationEnabled'
mockCpsNotificationService.isNotificationEnabled(_, 'anchor02') >> cpsNotificationServiceisNotificationEnabled
- when: 'service is called to publish data update event'
- objectUnderTest.publishCpsDataUpdateEvent(anchor, '/', CREATE, null)
- then: 'the event is only published when all related flags are true'
- expectedCallsToPublisher * mockEventsPublisher.publishCloudEvent(*_)
+ when: 'service is called to send data update event'
+ objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE, null)
+ then: 'the event is only sent when all related flags are true'
+ expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_)
where: 'the following flags are used'
- notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToPublisher
+ notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer
false | true | true || 0
true | false | true || 0
true | true | false || 0
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeader
import org.apache.kafka.common.header.internals.RecordHeaders
-import org.junit.jupiter.api.AfterEach
-import org.junit.jupiter.api.BeforeEach
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
import java.util.concurrent.CompletableFuture
-class EventsPublisherSpec extends Specification {
+class EventsProducerSpec extends Specification {
def legacyKafkaTemplateMock = Mock(KafkaTemplate)
def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
- def setupLogger = ((Logger) LoggerFactory.getLogger(EventsPublisher.class))
+ def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class))
setupLogger.setLevel(Level.DEBUG)
setupLogger.addAppender(logger)
logger.start()
}
void cleanup() {
- ((Logger) LoggerFactory.getLogger(EventsPublisher.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
}
- def objectUnderTest = new EventsPublisher(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
+ def objectUnderTest = new EventsProducer(legacyKafkaTemplateMock, mockCloudEventKafkaTemplate)
- def 'Publish Cloud Event'() {
- given: 'a successfully published event'
+ def 'Send Cloud Event'() {
+ given: 'a successfully sent event'
def eventFuture = CompletableFuture.completedFuture(
new SendResult(
new ProducerRecord('some-topic', 'some-value'),
)
def someCloudEvent = Mock(CloudEvent)
1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFuture
- when: 'publishing the cloud event'
- objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
+ when: 'sending the cloud event'
+ objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
then: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+ assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
}
- def 'Publish Cloud Event with Exception'() {
+ def 'Send Cloud Event with Exception'() {
given: 'a failed event'
def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
def someCloudEvent = Mock(CloudEvent)
1 * mockCloudEventKafkaTemplate.send('some-topic', 'some-event-key', someCloudEvent) >> eventFutureWithFailure
- when: 'publishing the cloud event'
- objectUnderTest.publishCloudEvent('some-topic', 'some-event-key', someCloudEvent)
+ when: 'sending the cloud event'
+ objectUnderTest.sendCloudEvent('some-topic', 'some-event-key', someCloudEvent)
then: 'the correct error message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.ERROR
- assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
+ assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
}
- def 'Publish Legacy Event'() {
- given: 'a successfully published event'
+ def 'Send Legacy Event'() {
+ given: 'a successfully sent event'
def eventFuture = CompletableFuture.completedFuture(
new SendResult(
new ProducerRecord('some-topic', 'some-value'),
)
def someEvent = Mock(Object)
1 * legacyKafkaTemplateMock.send('some-topic', 'some-event-key', someEvent) >> eventFuture
- when: 'publishing the cloud event'
- objectUnderTest.publishEvent('some-topic', 'some-event-key', someEvent)
+ when: 'sending the cloud event'
+ objectUnderTest.sendEvent('some-topic', 'some-event-key', someEvent)
then: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+ assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
}
- def 'Publish Legacy Event with Headers as Map'() {
- given: 'a successfully published event'
+ def 'Send Legacy Event with Headers as Map'() {
+ given: 'a successfully sent event'
def sampleEventHeaders = ['k1': SerializationUtils.serialize('v1')]
def eventFuture = CompletableFuture.completedFuture(
new SendResult(
)
)
def someEvent = Mock(Object.class)
- when: 'publishing the legacy event'
- objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
- then: 'event is published'
+ when: 'sending the legacy event'
+ objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+ then: 'event is sent'
1 * legacyKafkaTemplateMock.send(_) >> eventFuture
and: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+ assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
}
- def 'Publish Legacy Event with Record Headers'() {
- given: 'a successfully published event'
+ def 'Send Legacy Event with Record Headers'() {
+ given: 'a successfully sent event'
def sampleEventHeaders = new RecordHeaders([new RecordHeader('k1', SerializationUtils.serialize('v1'))])
def sampleProducerRecord = new ProducerRecord('some-topic', null, 'some-key', 'some-value', sampleEventHeaders)
def eventFuture = CompletableFuture.completedFuture(
)
)
def someEvent = Mock(Object.class)
- when: 'publishing the legacy event'
- objectUnderTest.publishEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
- then: 'event is published'
+ when: 'sending the legacy event'
+ objectUnderTest.sendEvent('some-topic', 'some-event-key', sampleEventHeaders, someEvent)
+ then: 'event is sent'
1 * legacyKafkaTemplateMock.send(_) >> eventFuture
and: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+ assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
}
def 'Handle Legacy Event Callback'() {
- given: 'an event is successfully published'
+ given: 'an event is successfully sent'
def eventFuture = CompletableFuture.completedFuture(
new SendResult(
new ProducerRecord('some-topic', 'some-value'),
then: 'the correct debug message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.DEBUG
- assert lastLoggingEvent.formattedMessage.contains('Successfully published event')
+ assert lastLoggingEvent.formattedMessage.contains('Successfully sent event')
}
def 'Handle Legacy Event Callback with Exception'() {
- given: 'a failure to publish an event'
+ given: 'a failure to send an event'
def eventFutureWithFailure = new CompletableFuture<SendResult<String, String>>()
eventFutureWithFailure.completeExceptionally(new RuntimeException('some exception'))
when: 'handling legacy event callback'
then: 'the correct error message is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.ERROR
- assert lastLoggingEvent.formattedMessage.contains('Unable to publish event')
+ assert lastLoggingEvent.formattedMessage.contains('Unable to send event')
}
def 'Convert to kafka headers'() {
and: 'the persistence service method is invoked with the correct parameters'
1 * mockCpsDataPersistenceService.deleteDataNodes(dataspaceName, _ as Collection<String>)
and: 'a data update event is sent for each anchor'
- 1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp)
- 1 * mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp)
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor1, '/', DELETE, observedTimestamp)
+ 1 * mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(anchor2, '/', DELETE, observedTimestamp)
}
def "Validating #scenario when dry run is enabled."() {
1 * mockCpsDataPersistenceService.lockAnchor('some-sessionId', 'some-dataspaceName', 'some-anchorName', 250L)
}
- def 'Exception is thrown while publishing the notification.'(){
+ def 'Exception is thrown while sending the notification.'(){
given: 'schema set for given anchor and dataspace references test-tree model'
setupSchemaSetMocks('test-tree.yang')
- when: 'publisher set to throw an exception'
- mockCpsDataUpdateEventsProducer.publishCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("publishing failed")}
+ when: 'producer throws an exception while sending event'
+ mockCpsDataUpdateEventsProducer.sendCpsDataUpdateEvent(_, _, _, _) >> { throw new Exception("Sending failed")}
and: 'an update event is performed'
objectUnderTest.updateNodeLeaves(dataspaceName, anchorName, '/', '{"test-tree": {"branch": []}}', observedTimestamp, ContentType.JSON)
then: 'the exception is not bubbled up'