From 9c57debe4cf97f9175894c6f1476a7015555070d Mon Sep 17 00:00:00 2001 From: ToineSiebelink Date: Wed, 3 Dec 2025 10:47:52 +0000 Subject: [PATCH] refactor: Remove LcmEventHeader parameter from sendLcmEvent method - Remove LcmEventHeader parameter from LcmEventsProducer.sendLcmEvent() - Extract headers directly from LcmEvent object using new extractHeadersAsMap() method - Remove json definition of now unnecessary lcm event header object - Fix bug(?) related to recording metrics when sending event failed - Removed unnecessary null checks for status during metric record - Refactored LcmEventHelper, LcmEventProducer and LcmEventProducerHelper into 3 classes with clear responsibilities LcmEventProducer : can send events (depends on LcmObjectCreator) LcmEventObjectCreator : create events(depends on CmHandlePropertyChangeDetector) CmHandlePropertyChangeDetector : detects the updates - Cleaned up corresponding test classes - Renamed cps/events/EventsProducer.java to cps/events/EventProducer (singular, consistent with other classes) (apologies, this caused updates for 3/4 of the files in this commit) - Renamed dmi/EventProducer to dmi/DmiEventProducer to avoid clashes with main EventProducer (in some testware) Issue-ID: CPS-3072 Change-Id: I2126fed8a1d45c0360a777b4419103804c5ff9f2 Signed-off-by: ToineSiebelink --- .../resources/schemas/lcm/lcm-event-header-v1.json | 56 ----- .../controller/NetworkCmProxyControllerSpec.groovy | 4 +- .../data/async/DataOperationEventConsumer.java | 6 +- .../DmiAsyncRequestResponseEventConsumer.java | 6 +- .../impl/data/utils/DmiDataOperationsHelper.java | 6 +- .../subscription/cmavc/CmAvcEventConsumer.java | 6 +- .../{EventProducer.java => DmiEventProducer.java} | 9 +- .../ncmp/CmSubscriptionHandlerImpl.java | 6 +- ...CmHandleRegistrationServicePropertyHandler.java | 6 +- .../sync/lcm/CmHandlePropertyChangeDetector.java | 189 ++++++++++++++ ...derMapper.java => CmHandlePropertyUpdates.java} | 24 +- .../inventory/sync/lcm/LcmEventObjectCreator.java | 108 ++++++++ .../impl/inventory/sync/lcm/LcmEventProducer.java | 139 ++++++++++ .../lcm/LcmEventsCmHandleStateHandlerImpl.java | 8 +- .../impl/inventory/sync/lcm/LcmEventsHelper.java | 70 ------ .../impl/inventory/sync/lcm/LcmEventsProducer.java | 119 --------- .../sync/lcm/LcmEventsProducerHelper.java | 280 --------------------- .../ncmp/utils/events/InventoryEventProducer.java | 8 +- .../ncmp/impl/data/DmiDataOperationsSpec.groovy | 22 +- ...AsyncRequestResponseEventIntegrationSpec.groovy | 8 +- .../async/DataOperationEventConsumerSpec.groovy | 6 +- .../async/FilterStrategiesIntegrationSpec.groovy | 12 +- .../data/async/SerializationIntegrationSpec.groovy | 8 +- .../data/utils/DmiDataOperationsHelperSpec.groovy | 7 +- .../cmavc/CmAvcEventConsumerSpec.groovy | 6 +- .../subscription/dmi/EventProducerSpec.groovy | 12 +- .../ncmp/CmSubscriptionHandlerImplSpec.groovy | 12 +- ...leRegistrationServicePropertyHandlerSpec.groovy | 15 +- .../sync/lcm/CmHandleStateMonitorSpec.groovy | 14 +- ...roducerSpec.groovy => EventProducerSpec.groovy} | 19 +- ...pec.groovy => LcmEventObjectCreatorSpec.groovy} | 38 +-- .../inventory/sync/lcm/LcmEventProducerSpec.groovy | 87 +++++++ .../LcmEventsCmHandleStateHandlerImplSpec.groovy | 40 ++- .../sync/lcm/LcmEventsProducerSpec.groovy | 130 ---------- .../utils/events/InventoryEventProducerSpec.groovy | 9 +- .../cps/events/CpsDataUpdateEventsProducer.java | 4 +- .../{EventsProducer.java => EventProducer.java} | 8 +- ...roovy => CpsDataUpdateEventProducerSpec.groovy} | 12 +- ...roducerSpec.groovy => EventProducerSpec.groovy} | 12 +- 39 files changed, 681 insertions(+), 850 deletions(-) delete mode 100644 cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json rename cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/{EventProducer.java => DmiEventProducer.java} (93%) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java rename cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/{LcmEventHeaderMapper.java => CmHandlePropertyUpdates.java} (68%) create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java create mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java delete mode 100644 cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java rename cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/{EventsProducerSpec.groovy => EventProducerSpec.groovy} (85%) rename cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/{LcmEventsProducerHelperSpec.groovy => LcmEventObjectCreatorSpec.groovy} (85%) create mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy delete mode 100644 cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy rename cps-service/src/main/java/org/onap/cps/events/{EventsProducer.java => EventProducer.java} (97%) rename cps-service/src/test/groovy/org/onap/cps/events/{CpsDataUpdateEventsProducerSpec.groovy => CpsDataUpdateEventProducerSpec.groovy} (94%) rename cps-service/src/test/groovy/org/onap/cps/events/{EventsProducerSpec.groovy => EventProducerSpec.groovy} (94%) diff --git a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json b/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json deleted file mode 100644 index 8c9922ef7e..0000000000 --- a/cps-ncmp-events/src/main/resources/schemas/lcm/lcm-event-header-v1.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - - "$schema": "https://json-schema.org/draft/2019-09/schema", - "$id": "urn:cps:org.onap.ncmp.cmhandle.lcm-event-header:v1", - "$ref": "#/definitions/LcmEventHeader", - - "definitions": { - "LcmEventHeader": { - "description": "The header for LCM event", - "type": "object", - "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader", - "properties": { - "eventId": { - "description": "The unique id identifying the event", - "type": "string" - }, - "eventCorrelationId": { - "description": "The id identifying the event", - "type": "string" - }, - "eventTime": { - "description": "The timestamp when original event occurred", - "type": "string" - }, - "eventSource": { - "description": "The source of the event", - "type": "string" - }, - "eventType": { - "description": "The type of the event", - "type": "string" - }, - "eventSchema": { - "description": "The schema that this event adheres to", - "type": "string" - }, - "eventSchemaVersion": { - "description": "The version of the schema that this event adheres to", - "type": "string" - } - }, - "required": [ - "eventId", - "eventCorrelationId", - "eventTime", - "eventSource", - "eventType", - "eventSchema", - "eventSchemaVersion", - "event" - ], - "additionalProperties": false - } - - } -} diff --git a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy index edd487263e..fae7b9ab48 100644 --- a/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy +++ b/cps-ncmp-rest/src/test/groovy/org/onap/cps/ncmp/rest/controller/NetworkCmProxyControllerSpec.groovy @@ -33,7 +33,7 @@ import org.onap.cps.TestUtils import org.onap.cps.api.exceptions.DataValidationException import org.onap.cps.api.model.ModuleDefinition import org.onap.cps.api.model.ModuleReference -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.api.inventory.DataStoreSyncState import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory @@ -143,7 +143,7 @@ class NetworkCmProxyControllerSpec extends Specification { } def cleanup() { - ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders() + ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders() } def 'Get Resource Data from pass-through operational.'() { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java index 9b43837c2c..efdd579b61 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumer.java @@ -25,7 +25,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -39,7 +39,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DataOperationEventConsumer { - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; /** * Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic' @@ -58,6 +58,6 @@ public class DataOperationEventConsumer { dataOperationEventConsumerRecord.headers(), "ce_destination"); final String correlationId = KafkaHeaders.getParsedKafkaHeader( dataOperationEventConsumerRecord.headers(), "ce_correlationid"); - eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value()); + eventProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value()); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java index 802e15aaa0..9f53f72e85 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/async/DmiAsyncRequestResponseEventConsumer.java @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -38,7 +38,7 @@ import org.springframework.stereotype.Component; @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class DmiAsyncRequestResponseEventConsumer { - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper; /** @@ -56,7 +56,7 @@ public class DmiAsyncRequestResponseEventConsumer { log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent); final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent = ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent); - eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), + eventProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(), ncmpAsyncRequestResponseEvent.getEventId(), ncmpAsyncRequestResponseEvent); } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java index 8edd21fefb..ea38e28a01 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelper.java @@ -34,7 +34,7 @@ import java.util.Set; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.onap.cps.ncmp.api.NcmpResponseStatus; import org.onap.cps.ncmp.api.data.models.DataOperationDefinition; import org.onap.cps.ncmp.api.data.models.DataOperationRequest; @@ -141,10 +141,10 @@ public class DmiDataOperationsHelper { if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) { final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic, requestId, cmHandleIdsPerResponseCodesPerOperation); - final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class); + final EventProducer eventProducer = CpsApplicationContext.getCpsBean(EventProducer.class); log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}", clientTopic, requestId, dataOperationCloudEvent.getId()); - eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent); + eventProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java index 3397df73f3..cb5dc2e2f8 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumer.java @@ -29,7 +29,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent; import org.onap.cps.ncmp.impl.inventory.InventoryPersistence; import org.springframework.beans.factory.annotation.Value; @@ -52,7 +52,7 @@ public class CmAvcEventConsumer { @Value("${app.ncmp.avc.cm-events-topic}") private String cmEventsTopicName; - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; private final CmAvcEventService cmAvcEventService; private final InventoryPersistence inventoryPersistence; @@ -76,7 +76,7 @@ public class CmAvcEventConsumer { final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key(); log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent); - eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); + eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent); } private void processCmAvcEventChanges(final ConsumerRecord cmAvcEventAsConsumerRecord) { diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiEventProducer.java similarity index 93% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiEventProducer.java index d1fccdbb12..19e08049ec 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/DmiEventProducer.java @@ -25,7 +25,7 @@ import static org.onap.cps.ncmp.events.NcmpEventDataSchema.SUBSCRIPTIONS_V1; import io.cloudevents.CloudEvent; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent; import org.onap.cps.ncmp.utils.events.NcmpEvent; import org.springframework.beans.factory.annotation.Value; @@ -35,9 +35,9 @@ import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) -public class EventProducer { +public class DmiEventProducer { - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; @Value("${app.ncmp.avc.cm-subscription-dmi-in}") private String dmiInEventTopic; @@ -52,7 +52,7 @@ public class EventProducer { */ public void send(final String subscriptionId, final String dmiPluginName, final String eventType, final DataJobSubscriptionDmiInEvent event) { - eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId, + eventProducer.sendCloudEvent(dmiInEventTopic, subscriptionId, toCloudEvent(eventType, event, subscriptionId, dmiPluginName)); } @@ -68,5 +68,4 @@ public class EventProducer { .asCloudEvent(); } - } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java index f4fc169c54..dcbbdcdad2 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImpl.java @@ -32,8 +32,8 @@ import java.util.Set; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector; +import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer; import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper; -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer; import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus; import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent; import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService; @@ -52,7 +52,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService; private final DmiInEventMapper dmiInEventMapper; - private final EventProducer eventProducer; + private final DmiEventProducer dmiEventProducer; private final InventoryPersistence inventoryPersistence; private final AlternateIdMatcher alternateIdMatcher; @@ -134,7 +134,7 @@ public class CmSubscriptionHandlerImpl implements CmSubscriptionHandler { final DataJobSubscriptionDmiInEvent dmiInEvent; dmiInEvent = buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector); - eventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent); + dmiEventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent); } } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandler.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandler.java index dcba2753ff..a8cd66dc26 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandler.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandler.java @@ -51,7 +51,7 @@ import org.onap.cps.ncmp.api.inventory.models.CmHandleRegistrationResponse; import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; import org.onap.cps.ncmp.impl.inventory.sync.lcm.CmHandleTransitionPair; -import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper; +import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer; import org.onap.cps.ncmp.impl.utils.YangDataConverter; import org.onap.cps.utils.JsonObjectMapper; import org.springframework.beans.factory.annotation.Qualifier; @@ -70,7 +70,7 @@ public class CmHandleRegistrationServicePropertyHandler { private final AlternateIdChecker alternateIdChecker; @Qualifier("cmHandleIdPerAlternateId") private final IMap cmHandleIdPerAlternateId; - private final LcmEventsHelper lcmEventsHelper; + private final LcmEventProducer lcmEventProducer; /** * Iterates over incoming updatedNcmpServiceCmHandles and update the dataNodes based on the updated attributes. @@ -172,7 +172,7 @@ public class CmHandleRegistrationServicePropertyHandler { final YangModelCmHandle updatedYangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId); final CmHandleTransitionPair cmHandleTransitionPair = new CmHandleTransitionPair(currentYangModelCmHandle, updatedYangModelCmHandle); - lcmEventsHelper.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair)); + lcmEventProducer.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair)); } private void updateProperties(final DataNode existingCmHandleDataNode, final PropertyType propertyType, diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java new file mode 100644 index 0000000000..e3cf1fb24d --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyChangeDetector.java @@ -0,0 +1,189 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.inventory.sync.lcm; + +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; +import org.onap.cps.ncmp.events.lcm.v1.Values; + +/** + * Utility class for examining and determining changes in CM handle properties. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class CmHandlePropertyChangeDetector { + + /** + * Determines property (update) values for creating a new CM handle. + * + * @param ncmpServiceCmHandle the CM handle being created + * @return CmHandlePropertyUpdates containing new values for the created CM handle + */ + static CmHandlePropertyUpdates determineUpdatesForCreate(final NcmpServiceCmHandle ncmpServiceCmHandle) { + final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates(); + cmHandlePropertyUpdates.setNewValues(new Values()); + cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle)); + cmHandlePropertyUpdates.getNewValues() + .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle)); + cmHandlePropertyUpdates.getNewValues() + .setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties())); + return cmHandlePropertyUpdates; + } + + /** + * Determines property updates between current and target CM handle states. + * + * @param currentNcmpServiceCmHandle the current CM handle state + * @param targetNcmpServiceCmHandle the target CM handle state + * @return CmHandlePropertyUpdates containing old and new values for changed properties + */ + static CmHandlePropertyUpdates determineUpdates(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + final boolean hasDataSyncFlagEnabledChanged = + hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); + final boolean hasCmHandleStateChanged = + hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); + final boolean arePublicCmHandlePropertiesEqual = + arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(), + targetNcmpServiceCmHandle.getPublicProperties() + ); + + final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates(); + + if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) { + cmHandlePropertyUpdates.setOldValues(new Values()); + cmHandlePropertyUpdates.setNewValues(new Values()); + } else { + return cmHandlePropertyUpdates; + } + + if (hasDataSyncFlagEnabledChanged) { + setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates); + } + + if (hasCmHandleStateChanged) { + setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates); + } + + if (!arePublicCmHandlePropertiesEqual) { + setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, + cmHandlePropertyUpdates); + } + + return cmHandlePropertyUpdates; + + } + + private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final CmHandlePropertyUpdates cmHandlePropertyUpdates) { + cmHandlePropertyUpdates.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle)); + cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle)); + + } + + private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final CmHandlePropertyUpdates cmHandlePropertyUpdates) { + cmHandlePropertyUpdates.getOldValues() + .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle)); + cmHandlePropertyUpdates.getNewValues() + .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle)); + } + + private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle, + final CmHandlePropertyUpdates cmHandlePropertyUpdates) { + + final Map> publicCmHandlePropertiesDifference = + getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(), + targetNcmpServiceCmHandle.getPublicProperties() + ); + cmHandlePropertyUpdates.getOldValues() + .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues"))); + cmHandlePropertyUpdates.getNewValues() + .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues"))); + + } + + private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState( + final NcmpServiceCmHandle ncmpServiceCmHandle) { + return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name()); + } + + private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) { + return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); + } + + private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); + final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); + + if (targetDataSyncFlag == null) { + return currentDataSyncFlag != null; + } + + return !targetDataSyncFlag.equals(currentDataSyncFlag); + } + + private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() + != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState(); + } + + private static boolean arePublicCmHandlePropertiesEqual(final Map currentCmHandleProperties, + final Map targetCmHandleProperties) { + if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) { + return false; + } + return targetCmHandleProperties.equals(currentCmHandleProperties); + } + + private static Map> getPublicCmHandlePropertiesDifference( + final Map currentCmHandleProperties, + final Map targetCmHandleProperties) { + final Map> oldAndNewPropertiesDifferenceMap = new HashMap<>(2); + + final MapDifference cmHandlePropertiesDifference = + Maps.difference(targetCmHandleProperties, currentCmHandleProperties); + + final Map oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight()); + final Map newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft()); + + cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> { + oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName)); + newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName)); + }); + + oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues); + oldAndNewPropertiesDifferenceMap.put("newValues", newValues); + + return oldAndNewPropertiesDifferenceMap; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventHeaderMapper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyUpdates.java similarity index 68% rename from cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventHeaderMapper.java rename to cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyUpdates.java index 7395838306..87c1b8f8e3 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventHeaderMapper.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandlePropertyUpdates.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,17 +20,15 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm; -import org.mapstruct.Mapper; -import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; -import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; - -@Mapper(componentModel = "spring") -public interface LcmEventHeaderMapper { - - /** - * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}. - */ - - LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent); +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.onap.cps.ncmp.events.lcm.v1.Values; +@NoArgsConstructor +@Getter +@Setter +class CmHandlePropertyUpdates { + private Values oldValues; + private Values newValues; } diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java new file mode 100644 index 0000000000..5c7b6fdf22 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreator.java @@ -0,0 +1,108 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.inventory.sync.lcm; + +import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED; +import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE; +import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE; +import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE; + +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; +import org.onap.cps.ncmp.events.lcm.v1.Event; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter; +import org.springframework.stereotype.Service; + +/** + * LcmEventObjectCreator to create the events send by LcmEventProducer. + */ + +@Slf4j +@Service +@RequiredArgsConstructor +public class LcmEventObjectCreator { + + /** + * Create Lifecycle Management Event. + * + * @param currentNcmpServiceCmHandle current ncmp service cmhandle + * @param targetNcmpServiceCmHandle target ncmp service cmhandle + * @return Populated LcmEvent + */ + public LcmEvent createLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId(); + final LcmEventType lcmEventType = + determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); + final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType); + final Event event = new Event(); + event.setCmHandleId(cmHandleId); + event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId()); + event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag()); + event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier()); + final CmHandlePropertyUpdates cmHandlePropertyUpdates = + determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); + event.setOldValues(cmHandlePropertyUpdates.getOldValues()); + event.setNewValues(cmHandlePropertyUpdates.getNewValues()); + lcmEvent.setEvent(event); + return lcmEvent; + } + + private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + + if (currentNcmpServiceCmHandle.getCompositeState() == null) { + return CREATE; + } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) { + return DELETE; + } + return UPDATE; + } + + private static CmHandlePropertyUpdates determineEventValues(final LcmEventType lcmEventType, + final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + if (CREATE == lcmEventType) { + return CmHandlePropertyChangeDetector.determineUpdatesForCreate(targetNcmpServiceCmHandle); + } + if (UPDATE == lcmEventType) { + return CmHandlePropertyChangeDetector.determineUpdates(currentNcmpServiceCmHandle, + targetNcmpServiceCmHandle); + } + return new CmHandlePropertyUpdates(); + } + + private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) { + final LcmEvent lcmEvent = new LcmEvent(); + lcmEvent.setEventId(UUID.randomUUID().toString()); + lcmEvent.setEventCorrelationId(eventCorrelationId); + lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime()); + lcmEvent.setEventSource("org.onap.ncmp"); + lcmEvent.setEventType(lcmEventType.getEventType()); + lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event"); + lcmEvent.setEventSchemaVersion("1.0"); + return lcmEvent; + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java new file mode 100644 index 0000000000..85532b3ab2 --- /dev/null +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducer.java @@ -0,0 +1,139 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.inventory.sync.lcm; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.events.EventProducer; +import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; +import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; +import org.onap.cps.ncmp.events.lcm.v1.Values; +import org.onap.cps.ncmp.impl.utils.YangDataConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.KafkaException; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +/** + * Producer service for sending Lifecycle Management (LCM) events. + * This service is responsible for creating and publishing LCM events to Kafka topics + * when CM handle state transitions occur. It supports asynchronous batch processing + * and includes metrics collection for monitoring event publishing performance. + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class LcmEventProducer { + + private static final Tag METRIC_TAG_METHOD = Tag.of("method", "sendLcmEvent"); + private static final Tag METRIC_TAG_CLASS = Tag.of("class", LcmEventProducer.class.getName()); + private final EventProducer eventProducer; + private final LcmEventObjectCreator lcmEventObjectCreator; + private final MeterRegistry meterRegistry; + + @Value("${app.lcm.events.topic:ncmp-events}") + private String topicName; + + @Value("${notification.enabled:true}") + private boolean notificationsEnabled; + + /** + * Sends LCM events in batches asynchronously for CM handle state transitions. + * This method processes a collection of CM handle transition pairs and sends + * corresponding LCM events to the configured Kafka topic. The processing is + * performed asynchronously using the "notificationExecutor" thread pool. + * + * @param cmHandleTransitionPairs Collection of pairs containing current and target + * CM handle states represented as YangModelCmHandle objects + */ + @Async("notificationExecutor") + public void sendLcmEventBatchAsynchronously(final Collection cmHandleTransitionPairs) { + cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent( + YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()), + YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle()) + )); + } + + /** + * Sends a single LCM event for a CM handle state transition. + * Creates an LCM event using the provided current and target CM handle states, + * publishes it to the configured Kafka topic, and records metrics for monitoring. + * Event publishing is conditional based on the notifications enabled configuration. + * + * @param currentNcmpServiceCmHandle The current state of the CM handle + * @param targetNcmpServiceCmHandle The target state of the CM handle + */ + private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle, + final NcmpServiceCmHandle targetNcmpServiceCmHandle) { + if (notificationsEnabled) { + final LcmEvent lcmEvent = lcmEventObjectCreator.createLcmEvent(currentNcmpServiceCmHandle, + targetNcmpServiceCmHandle); + final Timer.Sample timerSample = Timer.start(meterRegistry); + try { + final Map headersAsMap = extractHeadersAsMap(lcmEvent); + final String eventKey = currentNcmpServiceCmHandle.getCmHandleId(); + eventProducer.sendLegacyEvent(topicName, eventKey, headersAsMap, lcmEvent); + recordMetrics(lcmEvent, timerSample); + } catch (final KafkaException e) { + log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage()); + } + } else { + log.debug("Notifications disabled."); + } + } + + private Map extractHeadersAsMap(final LcmEvent lcmEvent) { + final Map headersAsMap = new HashMap<>(7); + headersAsMap.put("eventId", lcmEvent.getEventId()); + headersAsMap.put("eventCorrelationId", lcmEvent.getEventCorrelationId()); + headersAsMap.put("eventTime", lcmEvent.getEventTime()); + headersAsMap.put("eventSource", lcmEvent.getEventSource()); + headersAsMap.put("eventType", lcmEvent.getEventType()); + headersAsMap.put("eventSchema", lcmEvent.getEventSchema()); + headersAsMap.put("eventSchemaVersion", lcmEvent.getEventSchemaVersion()); + return headersAsMap; + } + + private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) { + final List tags = new ArrayList<>(4); + tags.add(METRIC_TAG_CLASS); + tags.add(METRIC_TAG_METHOD); + tags.add(createCmHandleStateTag("oldCmHandleState", lcmEvent.getEvent().getOldValues())); + tags.add(createCmHandleStateTag("newCmHandleState", lcmEvent.getEvent().getNewValues())); + timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send") + .description("Time taken to send a LCM event") + .tags(tags) + .register(meterRegistry)); + } + + private Tag createCmHandleStateTag(final String tageLabel, final Values values) { + return Tag.of(tageLabel, values.getCmHandleState().value()); + } + +} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java index 23faf53a92..37e1ae3a6f 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImpl.java @@ -48,7 +48,7 @@ import org.springframework.stereotype.Service; public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleStateHandler { private final InventoryPersistence inventoryPersistence; - private final LcmEventsHelper lcmEventsHelper; + private final LcmEventProducer lcmEventProducer; private final CmHandleStateMonitor cmHandleStateMonitor; @Override @@ -58,8 +58,10 @@ public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleState final Collection cmHandleTransitionPairs = prepareCmHandleTransitionBatch(targetCmHandleStatePerCmHandle); persistCmHandleBatch(cmHandleTransitionPairs); - lcmEventsHelper.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs); - cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs); + if (!cmHandleTransitionPairs.isEmpty()) { + lcmEventProducer.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs); + cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs); + } } @Override diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java deleted file mode 100644 index 375a602521..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsHelper.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.inventory.sync.lcm; - -import java.util.Collection; -import lombok.RequiredArgsConstructor; -import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; -import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; -import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; -import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle; -import org.onap.cps.ncmp.impl.utils.YangDataConverter; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Service; - -@Service -@RequiredArgsConstructor -public class LcmEventsHelper { - - private final LcmEventsProducerHelper lcmEventsProducerHelper; - private final LcmEventsProducer lcmEventsProducer; - - /** - * Send LcmEvent in batches and in asynchronous manner. - * - * @param cmHandleTransitionPairs Pair of existing and modified cm handle represented as YangModelCmHandle - */ - @Async("notificationExecutor") - public void sendLcmEventBatchAsynchronously(final Collection cmHandleTransitionPairs) { - cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent( - toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()), - toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle()) - )); - } - - private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId(); - final LcmEventHeader lcmEventHeader = - lcmEventsProducerHelper.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle, - targetNcmpServiceCmHandle - ); - final LcmEvent lcmEvent = - lcmEventsProducerHelper.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, - targetNcmpServiceCmHandle - ); - lcmEventsProducer.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader); - } - - private static NcmpServiceCmHandle toNcmpServiceCmHandle(final YangModelCmHandle yangModelCmHandle) { - return YangDataConverter.toNcmpServiceCmHandle(yangModelCmHandle); - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java deleted file mode 100644 index 333f0674e0..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducer.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.inventory.sync.lcm; - -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.Timer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.events.EventsProducer; -import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; -import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; -import org.onap.cps.ncmp.events.lcm.v1.Values; -import org.onap.cps.utils.JsonObjectMapper; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.KafkaException; -import org.springframework.stereotype.Service; - -/** - * LcmEventsProducer to call the producer and send on the dedicated topic. - */ - -@Slf4j -@Service -@RequiredArgsConstructor -public class LcmEventsProducer { - - private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent"); - private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName()); - private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A"; - private final EventsProducer eventsProducer; - private final JsonObjectMapper jsonObjectMapper; - private final MeterRegistry meterRegistry; - - @Value("${app.lcm.events.topic:ncmp-events}") - private String topicName; - - @Value("${notification.enabled:true}") - private boolean notificationsEnabled; - - /** - * Sends an LCM event to the dedicated topic with optional notification headers. - * Capture and log KafkaException If an error occurs while sending the event to Kafka - * - * @param cmHandleId CM handle id associated with the LCM event - * @param lcmEvent The LCM event object to be sent - * @param lcmEventHeader Optional headers associated with the LCM event - */ - public void sendLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) { - if (notificationsEnabled) { - lcmEventHeader.setEventId(lcmEvent.getEventId()); - lcmEventHeader.setEventTime(lcmEvent.getEventTime()); - final Timer.Sample timerSample = Timer.start(meterRegistry); - try { - @SuppressWarnings("unchecked") - final Map lcmEventHeadersMap = - jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class); - eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent); - } catch (final KafkaException e) { - log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage()); - } finally { - recordMetrics(lcmEvent, timerSample); - } - } else { - log.debug("Notifications disabled."); - } - } - - private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) { - final List tags = new ArrayList<>(4); - tags.add(TAG_CLASS); - tags.add(TAG_METHOD); - - final String oldCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getOldValues()); - tags.add(Tag.of("oldCmHandleState", oldCmHandleState)); - - final String newCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getNewValues()); - tags.add(Tag.of("newCmHandleState", newCmHandleState)); - - timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send") - .description("Time taken to send a LCM event") - .tags(tags) - .register(meterRegistry)); - } - - /** - * Extracts the CM handle state value from the given Values object. - * If the provided Values object or its CM handle state is null, returns a default value. - * - * @param values The Values object containing CM handle state information. - * @return The CM handle state value as a string, or a default value if null. - */ - private String extractCmHandleStateValue(final Values values) { - return (values != null && values.getCmHandleState() != null) - ? values.getCmHandleState().value() - : UNAVAILABLE_CM_HANDLE_STATE; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java deleted file mode 100644 index 580a3a00b7..0000000000 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelper.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.inventory.sync.lcm; - -import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED; -import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE; -import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE; -import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE; - -import com.google.common.collect.MapDifference; -import com.google.common.collect.Maps; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.RequiredArgsConstructor; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; -import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle; -import org.onap.cps.ncmp.events.lcm.v1.Event; -import org.onap.cps.ncmp.events.lcm.v1.LcmEvent; -import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader; -import org.onap.cps.ncmp.events.lcm.v1.Values; -import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter; -import org.springframework.stereotype.Component; - -/** - * LcmEventsProducerHelper to create LcmEvent based on relevant operation. - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class LcmEventsProducerHelper { - - private final LcmEventHeaderMapper lcmEventHeaderMapper; - - /** - * Create Lifecycle Management Event. - * - * @param cmHandleId cm handle identifier - * @param currentNcmpServiceCmHandle current ncmp service cmhandle - * @param targetNcmpServiceCmHandle target ncmp service cmhandle - * @return Populated LcmEvent - */ - public LcmEvent createLcmEvent(final String cmHandleId, - final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - final LcmEventType lcmEventType = - determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); - final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType); - final Event event = new Event(); - event.setCmHandleId(cmHandleId); - event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId()); - event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag()); - event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier()); - final CmHandleValuesHolder cmHandleValuesHolder = - determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); - event.setOldValues(cmHandleValuesHolder.getOldValues()); - event.setNewValues(cmHandleValuesHolder.getNewValues()); - lcmEvent.setEvent(event); - return lcmEvent; - } - - /** - * Create Lifecycle Management Event Header. - * - * @param cmHandleId cm handle identifier - * @param currentNcmpServiceCmHandle current ncmp service cmhandle - * @param targetNcmpServiceCmHandle target ncmp service cmhandle - * @return Populated LcmEventHeader - */ - public LcmEventHeader createLcmEventHeader(final String cmHandleId, - final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - final LcmEventType lcmEventType = - determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); - final LcmEvent lcmEventWithHeaderDetails = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType); - return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderDetails); - } - - private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - - if (currentNcmpServiceCmHandle.getCompositeState() == null) { - return CREATE; - } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) { - return DELETE; - } - return UPDATE; - } - - private static CmHandleValuesHolder determineEventValues(final LcmEventType lcmEventType, - final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - if (CREATE == lcmEventType) { - return determineCreateEventValues(targetNcmpServiceCmHandle); - } else if (UPDATE == lcmEventType) { - return determineUpdateEventValues(targetNcmpServiceCmHandle, currentNcmpServiceCmHandle); - } - return new CmHandleValuesHolder(); - - } - - private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) { - final LcmEvent lcmEvent = new LcmEvent(); - lcmEvent.setEventId(UUID.randomUUID().toString()); - lcmEvent.setEventCorrelationId(eventCorrelationId); - lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime()); - lcmEvent.setEventSource("org.onap.ncmp"); - lcmEvent.setEventType(lcmEventType.getEventType()); - lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event"); - lcmEvent.setEventSchemaVersion("1.0"); - return lcmEvent; - } - - - private static CmHandleValuesHolder determineCreateEventValues(final NcmpServiceCmHandle ncmpServiceCmHandle) { - final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder(); - cmHandleValuesHolder.setNewValues(new Values()); - cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle)); - cmHandleValuesHolder.getNewValues() - .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle)); - cmHandleValuesHolder.getNewValues().setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties())); - return cmHandleValuesHolder; - } - - private static CmHandleValuesHolder determineUpdateEventValues(final NcmpServiceCmHandle targetNcmpServiceCmHandle, - final NcmpServiceCmHandle currentNcmpServiceCmHandle) { - final boolean hasDataSyncFlagEnabledChanged = - hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); - final boolean hasCmHandleStateChanged = - hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle); - final boolean arePublicCmHandlePropertiesEqual = - arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(), - targetNcmpServiceCmHandle.getPublicProperties() - ); - - final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder(); - - if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) { - cmHandleValuesHolder.setOldValues(new Values()); - cmHandleValuesHolder.setNewValues(new Values()); - } else { - return cmHandleValuesHolder; - } - - if (hasDataSyncFlagEnabledChanged) { - setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder); - } - - if (hasCmHandleStateChanged) { - setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder); - } - - if (!arePublicCmHandlePropertiesEqual) { - setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, - cmHandleValuesHolder); - } - - return cmHandleValuesHolder; - - } - - private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle, - final CmHandleValuesHolder cmHandleValuesHolder) { - cmHandleValuesHolder.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle)); - cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle)); - - } - - private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle, - final CmHandleValuesHolder cmHandleValuesHolder) { - cmHandleValuesHolder.getOldValues() - .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle)); - cmHandleValuesHolder.getNewValues() - .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle)); - } - - private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle, - final CmHandleValuesHolder cmHandleValuesHolder) { - - final Map> publicCmHandlePropertiesDifference = - getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(), - targetNcmpServiceCmHandle.getPublicProperties() - ); - cmHandleValuesHolder.getOldValues() - .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues"))); - cmHandleValuesHolder.getNewValues() - .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues"))); - - } - - private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState( - final NcmpServiceCmHandle ncmpServiceCmHandle) { - return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name()); - } - - private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) { - return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); - } - - private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); - final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled(); - - if (targetDataSyncFlag == null) { - return currentDataSyncFlag != null; - } - - return !targetDataSyncFlag.equals(currentDataSyncFlag); - } - - private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle, - final NcmpServiceCmHandle targetNcmpServiceCmHandle) { - return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() - != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState(); - } - - private static boolean arePublicCmHandlePropertiesEqual(final Map currentCmHandleProperties, - final Map targetCmHandleProperties) { - if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) { - return false; - } - return targetCmHandleProperties.equals(currentCmHandleProperties); - } - - private static Map> getPublicCmHandlePropertiesDifference( - final Map currentCmHandleProperties, - final Map targetCmHandleProperties) { - final Map> oldAndNewPropertiesDifferenceMap = new HashMap<>(2); - - final MapDifference cmHandlePropertiesDifference = - Maps.difference(targetCmHandleProperties, currentCmHandleProperties); - - final Map oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight()); - final Map newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft()); - - cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> { - oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName)); - newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName)); - }); - - oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues); - oldAndNewPropertiesDifferenceMap.put("newValues", newValues); - - return oldAndNewPropertiesDifferenceMap; - } - - @NoArgsConstructor - @Getter - @Setter - static class CmHandleValuesHolder { - private Values oldValues; - private Values newValues; - } -} diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java index 7d116cd030..d01ba4a85d 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/utils/events/InventoryEventProducer.java @@ -27,7 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.onap.cps.events.EventsProducer; +import org.onap.cps.events.EventProducer; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc; import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent; import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data; @@ -38,7 +38,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class InventoryEventProducer { - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; @Value("${app.ncmp.avc.inventory-events-topic}") private String ncmpInventoryEventsTopicName; @@ -61,7 +61,7 @@ public class InventoryEventProducer { .build() .asCloudEvent(); - eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent); + eventProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent); } private AvcEvent buildAvcEvent(final String attributeName, @@ -84,4 +84,4 @@ public class InventoryEventProducer { extensions.put("correlationid", eventKey); return extensions; } -} \ No newline at end of file +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy index 2eaabeb5b7..a3b564666b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/DmiDataOperationsSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data import com.fasterxml.jackson.databind.ObjectMapper -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.api.data.models.CmResourceAddress import org.onap.cps.ncmp.api.data.models.DataOperationRequest import org.onap.cps.ncmp.api.exceptions.CmHandleNotFoundException @@ -42,7 +42,6 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity -import org.springframework.test.context.ContextConfiguration import reactor.core.publisher.Mono import static org.onap.cps.ncmp.api.NcmpResponseStatus.UNKNOWN_ERROR @@ -56,8 +55,7 @@ import static org.onap.cps.ncmp.api.data.models.OperationType.UPDATE import static org.onap.cps.ncmp.impl.models.RequiredDmiService.DATA import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@SpringBootTest -@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor]) +@SpringBootTest(classes = [EventProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor]) class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def NO_TOPIC = null @@ -73,17 +71,17 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { DmiDataOperations objectUnderTest @SpringBean - EventsProducer eventsProducer = Stub() + EventProducer eventProducerStub = Stub() @SpringBean - PolicyExecutor policyExecutor = Mock() + PolicyExecutor mockPolicyExecutor = Mock() @SpringBean - AlternateIdMatcher alternateIdMatcher = Mock() + AlternateIdMatcher mockAlternateIdMatcher = Mock() def 'Get resource data for #expectedDataStore from DMI without topic #scenario.'() { given: 'a cm handle for #cmHandleId' - alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId + mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId mockYangModelCmHandleRetrieval(additionalProperties) and: 'a positive response from DMI service when it is called with the expected parameters' def responseFromDmi = Mono.just(new ResponseEntity('{some-key:some-value}', HttpStatus.OK)) @@ -127,7 +125,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json') def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class) and: 'no valid cm handles are found for the request' - alternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') } + mockAlternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') } mockInventoryPersistence.getYangModelCmHandles(_) >> [] when: 'get resource data for group of cm handles is invoked' objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER) @@ -143,7 +141,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId] and: 'the sent cloud event will be captured' def actualDataOperationCloudEvent = null - eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] } + eventProducerStub.sendCloudEvent('my-topic-name', 'my-request-id', _) >> {args -> actualDataOperationCloudEvent = args[2] } and: 'a DMI client request exception is thrown when DMI service is called' mockDmiRestClient.asynchronousPostOperation(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) } when: 'attempt to get resource data for group of cm handles is invoked' @@ -174,7 +172,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { def 'Write data for pass-through:running datastore in DMI.'() { given: 'a cm handle for #cmHandleId' mockYangModelCmHandleRetrieval([yangModelCmHandleProperty]) - alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId + mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId and: 'a positive response from DMI service when it is called with the expected parameters' def expectedUrlTemplateParameters = new UrlTemplateParameters('myServiceName/dmi/v1/ch/{cmHandleId}/data/ds/{datastore}?resourceIdentifier={resourceIdentifier}', ['resourceIdentifier': resourceIdentifier, 'datastore': 'ncmp-datastore:passthrough-running', 'cmHandleId': cmHandleId]) def expectedJson = '{"operation":"' + expectedOperationInUrl + '","dataType":"some data type","data":"requestData","cmHandleProperties":{"prop1":"val1"},"moduleSetTag":""}' @@ -185,7 +183,7 @@ class DmiDataOperationsSpec extends DmiOperationsBaseSpec { then: 'the result is the response from the DMI service' assert result == responseFromDmi and: 'the permission was checked with the policy executor' - 1 * policyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' ) + 1 * mockPolicyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' ) where: 'the following operation is performed' operation || expectedOperationInUrl CREATE || 'create' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy index 906779c494..efd0055b69 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/CpsAsyncRequestResponseEventIntegrationSpec.groovy @@ -24,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer import org.mapstruct.factory.Mappers -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent import org.onap.cps.ncmp.utils.TestUtils @@ -37,14 +37,14 @@ import org.springframework.test.annotation.DirtiesContext import org.testcontainers.spock.Testcontainers import java.time.Duration -@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) +@SpringBootTest(classes = [EventProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec { @SpringBean - EventsProducer cpsAsyncRequestResponseEventProducer = - new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer cpsAsyncRequestResponseEventProducer = + new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @SpringBean diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy index 420da6ffb5..9c32ce9fe1 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/DataOperationEventConsumerSpec.groovy @@ -29,7 +29,7 @@ import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.header.internals.RecordHeaders -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent import org.onap.cps.ncmp.utils.TestUtils import org.onap.cps.ncmp.utils.events.MessagingBaseSpec @@ -45,13 +45,13 @@ import java.time.Duration import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper]) +@SpringBootTest(classes = [EventProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper]) @Testcontainers @DirtiesContext class DataOperationEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @SpringBean DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy index 602a54e262..49287d7b13 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/FilterStrategiesIntegrationSpec.groovy @@ -21,7 +21,7 @@ package org.onap.cps.ncmp.impl.data.async import io.cloudevents.core.builder.CloudEventBuilder -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.config.KafkaConfig import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec @@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { @SpringBean - EventsProducer mockEventsProducer = Mock() + EventProducer mockEventProducer = Mock() @SpringBean NcmpAsyncRequestResponseEventMapper mapper = Stub() @@ -61,7 +61,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { then: 'wait a little for async processing of message (must wait to try to avoid false positives)' TimeUnit.MILLISECONDS.sleep(300) and: 'event is not consumed' - 0 * mockEventsProducer.sendLegacyEvent(*_) + 0 * mockEventProducer.sendLegacyEvent(*_) } def 'Legacy event consumer with valid legacy event.'() { @@ -70,7 +70,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the send event call' def sendEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called' - mockEventsProducer.sendLegacyEvent(*_) >> { + mockEventProducer.sendLegacyEvent(*_) >> { sendEventMethodCalled = true } when: 'send the cloud event' @@ -90,7 +90,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the sent event call' def sendEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called' - mockEventsProducer.sendCloudEvent(*_) >> { + mockEventProducer.sendCloudEvent(*_) >> { sendEventMethodCalled = true } when: 'send the cloud event' @@ -114,7 +114,7 @@ class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec { then: 'wait a little for async processing of message (must wait to try to avoid false positives)' TimeUnit.MILLISECONDS.sleep(300) and: 'the event is not processed by this consumer' - 0 * mockEventsProducer.sendCloudEvent(*_) + 0 * mockEventProducer.sendCloudEvent(*_) } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy index b08294583e..5e671dc0ab 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/async/SerializationIntegrationSpec.groovy @@ -22,7 +22,7 @@ package org.onap.cps.ncmp.impl.data.async import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.core.builder.CloudEventBuilder -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.config.KafkaConfig import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent @@ -46,7 +46,7 @@ import spock.util.concurrent.PollingConditions class SerializationIntegrationSpec extends ConsumerBaseSpec { @SpringBean - EventsProducer mockEventsProducer = Mock() + EventProducer mockEventProducer = Mock() @SpringBean NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')} @@ -63,7 +63,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the send cloud event call' def sendCloudEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event' - mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> { + mockEventProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> { sendCloudEventMethodCalled = true } when: 'send the event' @@ -80,7 +80,7 @@ class SerializationIntegrationSpec extends ConsumerBaseSpec { and: 'a flag to track the send event call' def sendEventMethodCalled = false and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event' - mockEventsProducer.sendLegacyEvent(*_) >> { + mockEventProducer.sendLegacyEvent(*_) >> { sendEventMethodCalled = true } when: 'send the event' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy index b55959d5d8..0d6fda3b92 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/data/utils/DmiDataOperationsHelperSpec.groovy @@ -21,11 +21,10 @@ package org.onap.cps.ncmp.impl.data.utils import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.api.data.models.DataOperationRequest import org.onap.cps.ncmp.api.data.models.OperationType import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder @@ -46,7 +45,7 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext]) +@ContextConfiguration(classes = [EventProducer, CpsApplicationContext]) class DmiDataOperationsHelperSpec extends MessagingBaseSpec { def static clientTopic = 'my-topic-name' @@ -56,7 +55,7 @@ class DmiDataOperationsHelperSpec extends MessagingBaseSpec { JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) def 'Process per data operation request with #serviceName.'() { given: 'data operation request with 3 operations' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy index eedc961cd8..e8b860fbcd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/cmavc/CmAvcEventConsumerSpec.groovy @@ -27,7 +27,7 @@ import io.cloudevents.kafka.CloudEventDeserializer import io.cloudevents.kafka.impl.KafkaHeaders import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent import org.onap.cps.ncmp.impl.inventory.InventoryPersistence @@ -45,13 +45,13 @@ import java.time.Duration import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent -@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper]) +@SpringBootTest(classes = [EventProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext class CmAvcEventConsumerSpec extends MessagingBaseSpec { @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) def mockCmAvcEventService = Mock(CmAvcEventService) def mockInventoryPersistence = Mock(InventoryPersistence) diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy index 32f4eb8bd0..09e91fedfd 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/dmi/EventProducerSpec.groovy @@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.dmi import com.fasterxml.jackson.databind.ObjectMapper import io.cloudevents.CloudEvent import io.cloudevents.core.v1.CloudEventBuilder -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data @@ -31,16 +31,14 @@ import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscript import org.onap.cps.ncmp.utils.events.CloudEventMapper import org.onap.cps.utils.JsonObjectMapper import org.springframework.boot.test.context.SpringBootTest -import org.springframework.test.context.ContextConfiguration import spock.lang.Specification -@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder]) -@ContextConfiguration(classes = [CpsApplicationContext]) +@SpringBootTest(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper, CloudEventBuilder]) class EventProducerSpec extends Specification { - def mockEventsProducer = Mock(EventsProducer) + def mockEventProducer = Mock(EventProducer) - def objectUnderTest = new EventProducer(mockEventsProducer) + def objectUnderTest = new DmiEventProducer(mockEventProducer) def 'Create and Send Cm Notification Subscription DMI In Event'() { given: 'a cm subscription for a dmi plugin' @@ -53,7 +51,7 @@ class EventProducerSpec extends Specification { when: 'the event is sent' objectUnderTest.send(subscriptionId, dmiPluginName, eventType, dmiInEvent) then: 'the event contains the required attributes' - 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> { + 1 * mockEventProducer.sendCloudEvent(_, _, _) >> { args -> { assert args[0] == 'dmiplugin-test-topic' diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy index 14dc9067e3..a098375b51 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/datajobs/subscription/ncmp/CmSubscriptionHandlerImplSpec.groovy @@ -20,12 +20,9 @@ package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED -import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED - import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector +import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper -import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService import org.onap.cps.ncmp.impl.inventory.InventoryPersistence @@ -34,11 +31,14 @@ import org.onap.cps.ncmp.impl.utils.AlternateIdMatcher import org.onap.cps.ncmp.impl.utils.JexParser import spock.lang.Specification +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED +import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED + class CmSubscriptionHandlerImplSpec extends Specification { def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService) def mockDmiInEventMapper = Mock(DmiInEventMapper) - def mockDmiInEventProducer = Mock(EventProducer) + def mockDmiInEventProducer = Mock(DmiEventProducer) def mockInventoryPersistence = Mock(InventoryPersistence) def mockAlternateIdMatcher = Mock(AlternateIdMatcher) @@ -272,4 +272,4 @@ class CmSubscriptionHandlerImplSpec extends Specification { def getFdn(dataNodeSelector) { return JexParser.extractFdnPrefix(dataNodeSelector).orElse("") } -} \ No newline at end of file +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandlerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandlerSpec.groovy index bf8d8519c4..dad0411b82 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandlerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/CmHandleRegistrationServicePropertyHandlerSpec.groovy @@ -35,8 +35,7 @@ import org.onap.cps.api.model.DataNode import org.onap.cps.impl.DataNodeBuilder import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle -import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper -import org.onap.cps.utils.ContentType +import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer import org.onap.cps.utils.JsonObjectMapper import org.slf4j.LoggerFactory import spock.lang.Specification @@ -56,9 +55,9 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification { def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper()) def mockAlternateIdChecker = Mock(AlternateIdChecker) def mockCmHandleIdPerAlternateId = Mock(IMap) - def mockLcmEventsHelper = Mock(LcmEventsHelper) + def mockLcmEventProducer = Mock(LcmEventProducer) - def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventsHelper) + def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventProducer) def logger = Spy(ListAppender) void setup() { @@ -253,7 +252,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification { then: 'the update node leaves method is invoked once with correct parameters' 1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'New Data Producer Identifier') and: 'LCM event is sent' - 1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously({ cmHandleTransitionPairs -> + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously({cmHandleTransitionPairs -> assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'New Data Producer Identifier' }) where: 'the following scenarios are used' @@ -272,7 +271,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification { then: 'the update node leaves method is not invoked' 0 * mockCpsDataService.updateNodeLeaves(*_) and: 'No LCM events are sent' - 0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_) + 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_) and: 'debug information is logged' def loggingEvent = logger.list[0] assert loggingEvent.level == Level.DEBUG @@ -291,7 +290,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification { then: 'the update node leaves method is invoked once with correct parameters' 1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'newDataProducerIdentifier') and: 'LCM event is sent' - 1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously( { cmHandleTransitionPairs -> + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously( {cmHandleTransitionPairs -> assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'newDataProducerIdentifier' assert cmHandleTransitionPairs[0].currentYangModelCmHandle.dataProducerIdentifier == 'oldDataProducerIdentifier' }) @@ -307,7 +306,7 @@ class CmHandleRegistrationServicePropertyHandlerSpec extends Specification { then: 'the update node leaves method is not invoked' 0 * mockCpsDataService.updateNodeLeaves(*_) and: 'No LCM events are sent' - 0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_) + 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_) and: 'warning is logged' def lastLoggingEvent = logger.list[0] assert lastLoggingEvent.level == Level.WARN diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandleStateMonitorSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandleStateMonitorSpec.groovy index b409a1a139..f1e3daf825 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandleStateMonitorSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/CmHandleStateMonitorSpec.groovy @@ -65,14 +65,14 @@ class CmHandleStateMonitorSpec extends Specification { 1 * mockCmHandlesByState.putIfAbsent("lockedCmHandlesCount", expectedValue) 1 * mockCmHandlesByState.putIfAbsent("deletingCmHandlesCount", expectedValue) where: - scenario | queryResult || expectedValue - 'query service returns zero cm handle id'| [] || 0 - 'query service returns 1 cm handle id' | ['someId'] || 1 + scenario | queryResult || expectedValue + 'query service returns zero cm handle id'| [] || 0 + 'query service returns 1 cm handle id' | ['someId'] || 1 } def 'Update cm handle state metric'() { - given: 'a collection of cm handle state pair' + given: 'a cm handle state pair' def cmHandleTransitionPair = new CmHandleTransitionPair(new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: ADVISED)), new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: READY)) ) @@ -99,9 +99,9 @@ class CmHandleStateMonitorSpec extends Specification { then: 'the new value is as expected' assert entryProcessingMap.get(key) == expectedValue where: 'the following data is used' - scenario | key || expectedValue - 'current value of count is zero'| 'zeroCmHandlesCount'|| 0 - 'current value of count is >0' | 'tenCmHandlesCount' || 9 + scenario | key || expectedValue + 'current value of count is zero'| 'zeroCmHandlesCount' || 0 + 'current value of count is >0' | 'tenCmHandlesCount' || 9 } def 'Applying increasing entry processor to a key on map'() { diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy similarity index 85% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy index 5d974fe157..ea57d75b1e 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventsProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/EventProducerSpec.groovy @@ -23,7 +23,7 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm import com.fasterxml.jackson.databind.ObjectMapper import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.StringDeserializer -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.events.LegacyEvent import org.onap.cps.ncmp.events.lcm.v1.Event import org.onap.cps.ncmp.events.lcm.v1.LcmEvent @@ -42,19 +42,17 @@ import java.time.Duration @SpringBootTest(classes = [ObjectMapper, JsonObjectMapper]) @Testcontainers @DirtiesContext -class EventsProducerSpec extends MessagingBaseSpec { +class EventProducerSpec extends MessagingBaseSpec { def legacyEventKafkaConsumer = new KafkaConsumer(eventConsumerConfigProperties('ncmp-group', StringDeserializer)) - def testTopic = 'ncmp-events-test' @SpringBean - EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) + EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos) @Autowired JsonObjectMapper jsonObjectMapper - def 'Produce and Consume Event'() { given: 'event key and event data' def eventKey = 'lcm' @@ -86,20 +84,19 @@ class EventsProducerSpec extends MessagingBaseSpec { and: 'consumer has a subscription' legacyEventKafkaConsumer.subscribe([testTopic] as List) when: 'an event is sent' - eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData) + eventProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData) and: 'topic is polled' def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500)) then: 'poll returns one record' assert records.size() == 1 and: 'record key matches the expected event key' - def record = records.iterator().next() - assert eventKey == record.key + assert eventKey == records[0].key and: 'record matches the expected event' def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json') def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class) - assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class) + assert expectedLcmEvent == jsonObjectMapper.convertJsonString(records[0].value, LcmEvent.class) and: 'record header matches the expected parameters' - assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId - assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId + assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventId').value()) == eventId + assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelperSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreatorSpec.groovy similarity index 85% rename from cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelperSpec.groovy rename to cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreatorSpec.groovy index dbff44c07b..e3a9edfea6 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerHelperSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventObjectCreatorSpec.groovy @@ -20,7 +20,6 @@ package org.onap.cps.ncmp.impl.inventory.sync.lcm -import org.mapstruct.factory.Mappers import org.onap.cps.ncmp.api.inventory.models.CmHandleState import org.onap.cps.ncmp.api.inventory.models.CompositeState import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle @@ -31,21 +30,17 @@ import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETING import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY -class LcmEventsProducerHelperSpec extends Specification { +class LcmEventObjectCreatorSpec extends Specification { - LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper) - - def objectUnderTest = new LcmEventsProducerHelper(lcmEventsHeaderMapper) + def objectUnderTest = new LcmEventObjectCreator() def cmHandleId = 'test-cm-handle' def 'Map the LcmEvent for #operation'() { given: 'NCMP cm handle details with current and old properties' - def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState), - publicProperties: currentPublicProperties) - def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState), - publicProperties: targetPublicProperties) + def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState), publicProperties: currentPublicProperties) + def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState), publicProperties: targetPublicProperties) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'event header is mapped correctly' assert result.eventSource == 'org.onap.ncmp' assert result.eventCorrelationId == cmHandleId @@ -73,7 +68,7 @@ class LcmEventsProducerHelperSpec extends Specification { def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: READY), publicProperties: publicProperties) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'Properties are just the one which are same' assert result.event.oldValues == null assert result.event.newValues == null @@ -85,7 +80,7 @@ class LcmEventsProducerHelperSpec extends Specification { publicProperties: ['publicProperty1': 'value11', 'publicProperty2': 'value22']) def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, publicProperties: ['publicProperty1': 'value1', 'publicProperty2': 'value2']) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmhandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmhandle) then: 'event header is mapped correctly' assert result.eventSource == 'org.onap.ncmp' assert result.eventCorrelationId == cmHandleId @@ -106,7 +101,7 @@ class LcmEventsProducerHelperSpec extends Specification { def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: DELETING), publicProperties: ['publicProperty1': 'value1']) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'event header is mapped correctly' assert result.eventSource == 'org.onap.ncmp' assert result.eventCorrelationId == cmHandleId @@ -122,7 +117,7 @@ class LcmEventsProducerHelperSpec extends Specification { def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED)) def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY)) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'event header is mapped correctly' assert result.eventSource == 'org.onap.ncmp' assert result.eventCorrelationId == cmHandleId @@ -150,7 +145,7 @@ class LcmEventsProducerHelperSpec extends Specification { def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED)) def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY)) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'the data sync flag is not present in the event' assert result.event.oldValues.dataSyncEnabled == null assert result.event.newValues.dataSyncEnabled == null @@ -161,23 +156,12 @@ class LcmEventsProducerHelperSpec extends Specification { 'null to null' | null | null } - def 'Map the LcmEventHeader'() { - given: 'NCMP cm handle details with current and old details' - def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED)) - def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY)) - when: 'the lcm event header is created' - def result = objectUnderTest.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) - then: 'the header field are populated' - assert result.eventCorrelationId == cmHandleId - assert result.eventId != null - } - def 'Map the LcmEvent for alternate ID, data producer identifier, and module set tag when they contain #scenario'() { given: 'NCMP cm handle details with current and old values for alternate ID, module set tag, and data producer identifier' def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: currentAlternateId, moduleSetTag: currentModuleSetTag, dataProducerIdentifier: currentDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false)) def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: targetAlternateId, moduleSetTag: targetModuleSetTag, dataProducerIdentifier: targetDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false)) when: 'the lcm event is created' - def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) + def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle) then: 'the alternate ID, module set tag, and data producer identifier are present or are an empty string in the payload' assert result.event.alternateId == targetAlternateId assert result.event.moduleSetTag == targetModuleSetTag diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy new file mode 100644 index 0000000000..bfcff01cc2 --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventProducerSpec.groovy @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.impl.inventory.sync.lcm + +import io.micrometer.core.instrument.Tag +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import org.onap.cps.events.EventProducer +import org.onap.cps.ncmp.api.inventory.models.CompositeState +import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle +import org.springframework.kafka.KafkaException +import spock.lang.Specification + +import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED +import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY + +class LcmEventProducerSpec extends Specification { + + def mockEventProducer = Mock(EventProducer) + def lcmEventObjectCreator = new LcmEventObjectCreator() + def meterRegistry = new SimpleMeterRegistry() + + def objectUnderTest = new LcmEventProducer(mockEventProducer, lcmEventObjectCreator, meterRegistry) + + def cmHandleTransitionPair = new CmHandleTransitionPair( + new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: ADVISED), additionalProperties: [], publicProperties: []), + new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: READY), additionalProperties: [], publicProperties: []) + ) + + def 'Create and send lcm event where notifications are #scenario.'() { + given: 'notificationsEnabled is #notificationsEnabled' + objectUnderTest.notificationsEnabled = notificationsEnabled + when: 'service is called to send a batch of lcm events' + objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair]) + then: 'producer is called #expectedTimesMethodCalled times with correct identifiers' + expectedTimesMethodCalled * mockEventProducer.sendLegacyEvent(_, 'ch-1', _, _) >> { + args -> { + def eventHeaders = args[2] + assert UUID.fromString(eventHeaders.get('eventId')) != null + assert eventHeaders.get('eventCorrelationId') == 'ch-1' + } + } + and: 'metrics are recorded with correct tags' + def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer() + if (notificationsEnabled) { + assert timer.count() == 1 + assert timer.id.tags.containsAll(Tag.of('oldCmHandleState', 'ADVISED'), Tag.of('newCmHandleState', 'READY')) + } else { + assert timer == null + } + where: 'the following values are used' + scenario | notificationsEnabled || expectedTimesMethodCalled + 'enabled' | true || 1 + 'disabled' | false || 0 + } + + def 'Exception while sending message.'(){ + given: 'notifications are enabled' + objectUnderTest.notificationsEnabled = true + when: 'producer set to throw an exception' + mockEventProducer.sendLegacyEvent(*_) >> { throw new KafkaException('sending failed')} + and: 'attempt to send events' + objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair]) + then: 'the exception is just logged and not bubbled up' + noExceptionThrown() + and: 'metrics are not recorded' + assert meterRegistry.find('cps.ncmp.lcm.events.send').timer() == null + } + +} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy index 827af6110b..f5c0fea9d7 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsCmHandleStateHandlerImplSpec.groovy @@ -56,12 +56,10 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { } def mockInventoryPersistence = Mock(InventoryPersistence) - def mockLcmEventsCreator = Mock(LcmEventsProducerHelper) - def mockLcmEventsProducer = Mock(LcmEventsProducer) + def mockLcmEventProducer = Mock(LcmEventProducer) def mockCmHandleStateMonitor = Mock(CmHandleStateMonitor) - def lcmEventsHelper = new LcmEventsHelper(mockLcmEventsCreator, mockLcmEventsProducer) - def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, lcmEventsHelper, mockCmHandleStateMonitor) + def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, mockLcmEventProducer, mockCmHandleStateMonitor) def cmHandleId = 'cmhandle-id-1' def currentCompositeState @@ -82,7 +80,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { assert loggingEvent.level == Level.DEBUG assert loggingEvent.formattedMessage == "${cmHandleId} is now in ${toCmHandleState} state" and: 'event service is called to send event' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) where: 'state change parameters are provided' stateChange | fromCmHandleState | toCmHandleState 'ADVISED to READY' | ADVISED | READY @@ -99,7 +97,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'CM-handle is saved using inventory persistence' 1 * mockInventoryPersistence.saveCmHandleBatch(List.of(yangModelCmHandle)) and: 'event service is called to send event' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'a log entry is written' assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state" } @@ -115,7 +113,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { assert cmHandleStatePerCmHandleId.get(cmHandleId).lockReason.details == 'some lock details' }) and: 'event service is called to send event' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'a log entry is written' assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state" } @@ -134,7 +132,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { assert cmHandleStatePerCmHandleId.get(cmHandleId).dataStores.operationalDataStore.dataStoreSyncState == DataStoreSyncState.NONE_REQUESTED }) and: 'event service is called to send event' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'a log entry is written' assert getLogMessage(0) == "${cmHandleId} is now in READY state" } @@ -150,7 +148,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { and: 'method to persist cm handle state is called once' 1 * mockInventoryPersistence.saveCmHandleStateBatch([(cmHandleId): yangModelCmHandle.compositeState]) and: 'the method to send Lcm event is called once' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) } def 'Update cmHandle state to DELETING to DELETED' (){ @@ -162,7 +160,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'the cm handle state is as expected' yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED and: 'the method to send Lcm event is called once' - 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _) + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) } def 'No state change and no event to be sent'() { @@ -174,7 +172,7 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) and: 'no event will be sent' - 0 * mockLcmEventsProducer.sendLcmEvent(*_) + 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_) } def 'Batch of new cm handles provided'() { @@ -188,8 +186,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { }) and: 'no state updates are persisted' 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) - and: 'event service is called to send events' - 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _) + and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)' + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'two log entries are written' assert getLogMessage(0) == 'cmhandle1 is now in ADVISED state' assert getLogMessage(1) == 'cmhandle2 is now in ADVISED state' @@ -206,8 +204,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { }) and: 'no new handles are persisted' 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) - and: 'event service is called to send events' - 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _) + and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)' + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'two log entries are written' assert getLogMessage(0) == 'cmhandle1 is now in READY state' assert getLogMessage(1) == 'cmhandle2 is now in DELETING state' @@ -222,8 +220,8 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { 1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP) and: 'no new handles are persisted' 1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST) - and: 'event service is called to send events' - 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _) + and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)' + 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_) and: 'two log entries are written' assert getLogMessage(0) == 'cmhandle1 is now in DELETED state' assert getLogMessage(1) == 'cmhandle2 is now in DELETED state' @@ -239,35 +237,29 @@ class LcmEventsCmHandleStateHandlerImplSpec extends Specification { then: 'the exception is not handled' thrown(RuntimeException) and: 'no events are sent' - 0 * mockLcmEventsProducer.sendLcmEvent(_, _, _) + 0 * mockLcmEventProducer.sendLcmEvent(*_) and: 'no log entries are written' assert logAppender.list.empty } def setupBatch(type) { - def yangModelCmHandle1 = new YangModelCmHandle(id: 'cmhandle1', additionalProperties: [], publicProperties: []) def yangModelCmHandle2 = new YangModelCmHandle(id: 'cmhandle2', additionalProperties: [], publicProperties: []) - switch (type) { case 'NEW': return [yangModelCmHandle1, yangModelCmHandle2] - case 'DELETED': yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: READY) yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) return [(yangModelCmHandle1): DELETED, (yangModelCmHandle2): DELETED] - case 'UPDATE': yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) return [(yangModelCmHandle1): READY, (yangModelCmHandle2): DELETING] - case 'NO_CHANGE': yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED) yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY) return [(yangModelCmHandle1): ADVISED, (yangModelCmHandle2): READY] - default: throw new IllegalArgumentException("batch type '${type}' not recognized") } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy deleted file mode 100644 index 4bcb89aac3..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/impl/inventory/sync/lcm/LcmEventsProducerSpec.groovy +++ /dev/null @@ -1,130 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.impl.inventory.sync.lcm - -import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.ADVISED -import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY - -import io.micrometer.core.instrument.Tag -import io.micrometer.core.instrument.simple.SimpleMeterRegistry -import org.onap.cps.events.EventsProducer -import org.onap.cps.ncmp.events.lcm.v1.Event -import org.onap.cps.ncmp.events.lcm.v1.LcmEvent -import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader -import org.onap.cps.ncmp.events.lcm.v1.Values -import org.onap.cps.utils.JsonObjectMapper -import org.springframework.kafka.KafkaException -import spock.lang.Specification - -class LcmEventsProducerSpec extends Specification { - - def mockLcmEventsProducer = Mock(EventsProducer) - def mockJsonObjectMapper = Mock(JsonObjectMapper) - def meterRegistry = new SimpleMeterRegistry() - - def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry) - - def 'Create and send lcm event where events are #scenario'() { - given: 'a cm handle id and Lcm Event' - def cmHandleId = 'test-cm-handle-id' - def eventId = UUID.randomUUID().toString() - def event = getEventWithCmHandleState(ADVISED, READY) - def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId) - and: 'we also have a lcm event header' - def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) - and: 'notificationsEnabled is #notificationsEnabled and it will be true as default' - objectUnderTest.notificationsEnabled = notificationsEnabled - and: 'lcm event header is transformed to headers map' - mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId] - when: 'service is called to send lcm event' - objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader) - then: 'producer is called #expectedTimesMethodCalled times' - expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> { - args -> { - def eventHeaders = (args[2] as Map) - assert eventHeaders.containsKey('eventId') - assert eventHeaders.containsKey('eventCorrelationId') - assert eventHeaders.get('eventId') == eventId - assert eventHeaders.get('eventCorrelationId') == cmHandleId - } - } - and: 'metrics are recorded with correct tags' - def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer() - if (notificationsEnabled) { - assert timer != null - assert timer.count() == expectedTimesMethodCalled - def tags = timer.getId().getTags() - assert tags.containsAll(Tag.of('oldCmHandleState', ADVISED.value()), Tag.of('newCmHandleState', READY.value())) - } else { - assert timer == null - } - where: 'the following values are used' - scenario | notificationsEnabled || expectedTimesMethodCalled - 'enabled' | true || 1 - 'disabled' | false || 0 - } - - def 'Unable to send message'(){ - given: 'a cm handle id and Lcm Event and notification enabled' - def cmHandleId = 'test-cm-handle-id' - def eventId = UUID.randomUUID().toString() - and: 'event #event' - def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId) - def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId) - objectUnderTest.notificationsEnabled = true - when: 'producer set to throw an exception' - mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')} - and: 'an event is publised' - objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader) - then: 'the exception is just logged and not bubbled up' - noExceptionThrown() - and: 'metrics are recorded with error tags' - def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer() - assert timer != null - assert timer.count() == 1 - def expectedTags = [Tag.of('oldCmHandleState', 'N/A'), Tag.of('newCmHandleState', 'N/A')] - def tags = timer.getId().getTags() - assert tags.containsAll(expectedTags) - where: 'the following values are used' - scenario | event - 'without values' | new Event() - 'without cm handle state' | getEvent() - } - - def getEvent() { - def event = new Event() - def values = new Values() - event.setOldValues(values) - event.setNewValues(values) - event - } - - def getEventWithCmHandleState(oldCmHandleState, newCmHandleState) { - def event = new Event() - def advisedCmHandleStateValues = new Values() - advisedCmHandleStateValues.setCmHandleState(oldCmHandleState) - event.setOldValues(advisedCmHandleStateValues) - def readyCmHandleStateValues = new Values() - readyCmHandleStateValues.setCmHandleState(newCmHandleState) - event.setNewValues(readyCmHandleStateValues) - return event - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy index 1ee936b76a..27519e675b 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/utils/events/InventoryEventProducerSpec.groovy @@ -21,8 +21,7 @@ package org.onap.cps.ncmp.utils.events import com.fasterxml.jackson.databind.ObjectMapper -import io.cloudevents.CloudEvent -import org.onap.cps.events.EventsProducer +import org.onap.cps.events.EventProducer import org.onap.cps.ncmp.config.CpsApplicationContext import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent @@ -32,8 +31,8 @@ import org.springframework.test.context.ContextConfiguration @ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper]) class InventoryEventProducerSpec extends MessagingBaseSpec { - def mockEventsProducer = Mock(EventsProducer) - def objectUnderTest = new InventoryEventProducer(mockEventsProducer) + def mockEventProducer = Mock(EventProducer) + def objectUnderTest = new InventoryEventProducer(mockEventProducer) def 'Send an attribute value change event'() { given: 'the event key' @@ -47,7 +46,7 @@ class InventoryEventProducerSpec extends MessagingBaseSpec { when: 'an attribute value change event is sent' objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue) then: 'the cloud event producer is invoked with the correct data' - 1 * mockEventsProducer.sendCloudEvent(_, someEventKey, + 1 * mockEventProducer.sendCloudEvent(_, someEventKey, cloudEvent -> { def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange def expectedAvc = new Avc(attributeName: someAttributeName, diff --git a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java index 86a930596e..5cd4f31f7b 100644 --- a/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/CpsDataUpdateEventsProducer.java @@ -43,7 +43,7 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class CpsDataUpdateEventsProducer { - private final EventsProducer eventsProducer; + private final EventProducer eventProducer; private final CpsNotificationService cpsNotificationService; @@ -75,7 +75,7 @@ public class CpsDataUpdateEventsProducer { final CloudEvent cpsDataUpdatedEventAsCloudEvent = CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent) .extensions(extensions).build().asCloudEvent(); - eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); + eventProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent); } else { log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}", notificationsEnabled, cpsChangeEventNotificationsEnabled); diff --git a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java b/cps-service/src/main/java/org/onap/cps/events/EventProducer.java similarity index 97% rename from cps-service/src/main/java/org/onap/cps/events/EventsProducer.java rename to cps-service/src/main/java/org/onap/cps/events/EventProducer.java index aea8406857..b0fc551412 100644 --- a/cps-service/src/main/java/org/onap/cps/events/EventsProducer.java +++ b/cps-service/src/main/java/org/onap/cps/events/EventProducer.java @@ -36,18 +36,18 @@ import org.springframework.stereotype.Service; import org.springframework.util.SerializationUtils; /** - * EventsProducer to send events. + * EventProducer to send events. */ @Slf4j @Service @RequiredArgsConstructor -public class EventsProducer { +public class EventProducer { /** * KafkaTemplate for legacy (non-cloud) events. - * Note: Cloud events should be used. This will be addressed as part of .... + * Note: Cloud events should be used. This will be addressed as part of + * .... */ @Qualifier("legacyEventKafkaTemplate") private final KafkaTemplate legacyEventKafkaTemplate; diff --git a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventProducerSpec.groovy similarity index 94% rename from cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy rename to cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventProducerSpec.groovy index 3bbed52aa7..1b3e2600a4 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/CpsDataUpdateEventProducerSpec.groovy @@ -39,17 +39,17 @@ import static org.onap.cps.events.model.EventPayload.Action.REMOVE import static org.onap.cps.events.model.EventPayload.Action.REPLACE @ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper]) -class CpsDataUpdateEventsProducerSpec extends Specification { +class CpsDataUpdateEventProducerSpec extends Specification { static def CREATE_ACTION = CREATE.value() static def REPLACE_ACTION = REPLACE.value() static def REMOVE_ACTION = REMOVE.value() - def mockEventsProducer = Mock(EventsProducer) + def mockEventProducer = Mock(EventProducer) def objectMapper = new ObjectMapper(); def mockCpsNotificationService = Mock(CpsNotificationService) - def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService) + def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventProducer, mockCpsNotificationService) def setup() { mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true @@ -66,7 +66,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification { when: 'service is called to send data update event' objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, OffsetDateTime.now()) then: 'the event contains the required attributes' - 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> { + 1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> { args -> { def cpsDataUpdatedEvent = (args[2] as CloudEvent) @@ -99,7 +99,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification { when: 'service is called to send data event' objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null) then: 'the event is sent' - 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) + 1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) } def 'Enabling and disabling sending cps events.'() { @@ -114,7 +114,7 @@ class CpsDataUpdateEventsProducerSpec extends Specification { when: 'service is called to send data event' objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null) then: 'the event is only sent when all related flags are true' - expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_) + expectedCallsToProducer * mockEventProducer.sendCloudEvent(*_) where: 'the following flags are used' notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer false | true | true || 0 diff --git a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy b/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy similarity index 94% rename from cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy rename to cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy index 8cd1cbd782..528a859e07 100644 --- a/cps-service/src/test/groovy/org/onap/cps/events/EventsProducerSpec.groovy +++ b/cps-service/src/test/groovy/org/onap/cps/events/EventProducerSpec.groovy @@ -29,8 +29,6 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.header.Headers -import org.apache.kafka.common.header.internals.RecordHeader -import org.apache.kafka.common.header.internals.RecordHeaders import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult @@ -39,7 +37,7 @@ import spock.lang.Specification import java.util.concurrent.CompletableFuture -class EventsProducerSpec extends Specification { +class EventProducerSpec extends Specification { def mockLegacyKafkaTemplate = Mock(KafkaTemplate) def mockCloudEventKafkaTemplate = Mock(KafkaTemplate) @@ -47,17 +45,17 @@ class EventsProducerSpec extends Specification { def logger = Spy(ListAppender) void setup() { - def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class)) + def setupLogger = ((Logger) LoggerFactory.getLogger(EventProducer.class)) setupLogger.setLevel(Level.DEBUG) setupLogger.addAppender(logger) logger.start() } void cleanup() { - ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders() + ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders() } - def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos) + def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos) def 'Send Cloud Event'() { given: 'a successfully sent event' @@ -180,7 +178,7 @@ class EventsProducerSpec extends Specification { getProducerRecord() >> Mock(ProducerRecord) } def runtimeException = new RuntimeException('some runtime exception') - def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean) + def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean) logOutcomeMethod.accessible = true when: 'logging the outcome with throwKafkaException set to true' logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true) -- 2.16.6