+++ /dev/null
-{
-
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "$id": "urn:cps:org.onap.ncmp.cmhandle.lcm-event-header:v1",
- "$ref": "#/definitions/LcmEventHeader",
-
- "definitions": {
- "LcmEventHeader": {
- "description": "The header for LCM event",
- "type": "object",
- "javaType" : "org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader",
- "properties": {
- "eventId": {
- "description": "The unique id identifying the event",
- "type": "string"
- },
- "eventCorrelationId": {
- "description": "The id identifying the event",
- "type": "string"
- },
- "eventTime": {
- "description": "The timestamp when original event occurred",
- "type": "string"
- },
- "eventSource": {
- "description": "The source of the event",
- "type": "string"
- },
- "eventType": {
- "description": "The type of the event",
- "type": "string"
- },
- "eventSchema": {
- "description": "The schema that this event adheres to",
- "type": "string"
- },
- "eventSchemaVersion": {
- "description": "The version of the schema that this event adheres to",
- "type": "string"
- }
- },
- "required": [
- "eventId",
- "eventCorrelationId",
- "eventTime",
- "eventSource",
- "eventType",
- "eventSchema",
- "eventSchemaVersion",
- "event"
- ],
- "additionalProperties": false
- }
-
- }
-}
import org.onap.cps.api.exceptions.DataValidationException
import org.onap.cps.api.model.ModuleDefinition
import org.onap.cps.api.model.ModuleReference
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.api.inventory.DataStoreSyncState
import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.api.inventory.models.LockReasonCategory
}
def cleanup() {
- ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
}
def 'Get Resource Data from pass-through operational.'() {
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DataOperationEventConsumer {
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
/**
* Consume the DataOperation cloud event sent by producer to topic 'async-m2m.topic'
dataOperationEventConsumerRecord.headers(), "ce_destination");
final String correlationId = KafkaHeaders.getParsedKafkaHeader(
dataOperationEventConsumerRecord.headers(), "ce_correlationid");
- eventsProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
+ eventProducer.sendCloudEvent(eventTarget, correlationId, dataOperationEventConsumerRecord.value());
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class DmiAsyncRequestResponseEventConsumer {
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
private final NcmpAsyncRequestResponseEventMapper ncmpAsyncRequestResponseEventMapper;
/**
log.debug("Consuming event {} ...", dmiAsyncRequestResponseEvent);
final NcmpAsyncRequestResponseEvent ncmpAsyncRequestResponseEvent =
ncmpAsyncRequestResponseEventMapper.toNcmpAsyncEvent(dmiAsyncRequestResponseEvent);
- eventsProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
+ eventProducer.sendLegacyEvent(ncmpAsyncRequestResponseEvent.getEventTarget(),
ncmpAsyncRequestResponseEvent.getEventId(),
ncmpAsyncRequestResponseEvent);
}
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.onap.cps.ncmp.api.NcmpResponseStatus;
import org.onap.cps.ncmp.api.data.models.DataOperationDefinition;
import org.onap.cps.ncmp.api.data.models.DataOperationRequest;
if (!cmHandleIdsPerResponseCodesPerOperation.isEmpty()) {
final CloudEvent dataOperationCloudEvent = DataOperationEventCreator.createDataOperationEvent(clientTopic,
requestId, cmHandleIdsPerResponseCodesPerOperation);
- final EventsProducer eventsProducer = CpsApplicationContext.getCpsBean(EventsProducer.class);
+ final EventProducer eventProducer = CpsApplicationContext.getCpsBean(EventProducer.class);
log.warn("sending error message to client topic: {} ,requestId: {}, data operation cloud event id: {}",
clientTopic, requestId, dataOperationCloudEvent.getId());
- eventsProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
+ eventProducer.sendCloudEvent(clientTopic, requestId, dataOperationCloudEvent);
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.springframework.beans.factory.annotation.Value;
@Value("${app.ncmp.avc.cm-events-topic}")
private String cmEventsTopicName;
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
private final CmAvcEventService cmAvcEventService;
private final InventoryPersistence inventoryPersistence;
final String outgoingAvcEventKey = cmAvcEventAsConsumerRecord.key();
log.debug("Consuming AVC event with key : {} and value : {}", outgoingAvcEventKey, outgoingAvcEvent);
- eventsProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
+ eventProducer.sendCloudEventUsingEos(cmEventsTopicName, outgoingAvcEventKey, outgoingAvcEvent);
}
private void processCmAvcEventChanges(final ConsumerRecord<String, CloudEvent> cmAvcEventAsConsumerRecord) {
import io.cloudevents.CloudEvent;
import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
import org.onap.cps.ncmp.utils.events.NcmpEvent;
import org.springframework.beans.factory.annotation.Value;
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
-public class EventProducer {
+public class DmiEventProducer {
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
@Value("${app.ncmp.avc.cm-subscription-dmi-in}")
private String dmiInEventTopic;
*/
public void send(final String subscriptionId, final String dmiPluginName,
final String eventType, final DataJobSubscriptionDmiInEvent event) {
- eventsProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
+ eventProducer.sendCloudEvent(dmiInEventTopic, subscriptionId,
toCloudEvent(eventType, event, subscriptionId, dmiPluginName));
}
.asCloudEvent();
}
-
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector;
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer;
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper;
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer;
import org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus;
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent;
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService;
private final CmDataJobSubscriptionPersistenceService cmDataJobSubscriptionPersistenceService;
private final DmiInEventMapper dmiInEventMapper;
- private final EventProducer eventProducer;
+ private final DmiEventProducer dmiEventProducer;
private final InventoryPersistence inventoryPersistence;
private final AlternateIdMatcher alternateIdMatcher;
final DataJobSubscriptionDmiInEvent dmiInEvent;
dmiInEvent = buildDmiInEvent(cmHandleIdsAndDataNodeSelectors, dataSelector);
- eventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent);
+ dmiEventProducer.send(subscriptionId, dmiServiceName, eventType, dmiInEvent);
}
}
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.ncmp.impl.inventory.sync.lcm.CmHandleTransitionPair;
-import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper;
+import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer;
import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Qualifier;
private final AlternateIdChecker alternateIdChecker;
@Qualifier("cmHandleIdPerAlternateId")
private final IMap<String, String> cmHandleIdPerAlternateId;
- private final LcmEventsHelper lcmEventsHelper;
+ private final LcmEventProducer lcmEventProducer;
/**
* Iterates over incoming updatedNcmpServiceCmHandles and update the dataNodes based on the updated attributes.
final YangModelCmHandle updatedYangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
final CmHandleTransitionPair cmHandleTransitionPair =
new CmHandleTransitionPair(currentYangModelCmHandle, updatedYangModelCmHandle);
- lcmEventsHelper.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair));
+ lcmEventProducer.sendLcmEventBatchAsynchronously(List.of(cmHandleTransitionPair));
}
private void updateProperties(final DataNode existingCmHandleDataNode, final PropertyType propertyType,
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
+
+/**
+ * Utility class for examining and determining changes in CM handle properties.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CmHandlePropertyChangeDetector {
+
+ /**
+ * Determines property (update) values for creating a new CM handle.
+ *
+ * @param ncmpServiceCmHandle the CM handle being created
+ * @return CmHandlePropertyUpdates containing new values for the created CM handle
+ */
+ static CmHandlePropertyUpdates determineUpdatesForCreate(final NcmpServiceCmHandle ncmpServiceCmHandle) {
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates();
+ cmHandlePropertyUpdates.setNewValues(new Values());
+ cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle));
+ cmHandlePropertyUpdates.getNewValues()
+ .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle));
+ cmHandlePropertyUpdates.getNewValues()
+ .setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties()));
+ return cmHandlePropertyUpdates;
+ }
+
+ /**
+ * Determines property updates between current and target CM handle states.
+ *
+ * @param currentNcmpServiceCmHandle the current CM handle state
+ * @param targetNcmpServiceCmHandle the target CM handle state
+ * @return CmHandlePropertyUpdates containing old and new values for changed properties
+ */
+ static CmHandlePropertyUpdates determineUpdates(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ final boolean hasDataSyncFlagEnabledChanged =
+ hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+ final boolean hasCmHandleStateChanged =
+ hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+ final boolean arePublicCmHandlePropertiesEqual =
+ arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(),
+ targetNcmpServiceCmHandle.getPublicProperties()
+ );
+
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates = new CmHandlePropertyUpdates();
+
+ if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) {
+ cmHandlePropertyUpdates.setOldValues(new Values());
+ cmHandlePropertyUpdates.setNewValues(new Values());
+ } else {
+ return cmHandlePropertyUpdates;
+ }
+
+ if (hasDataSyncFlagEnabledChanged) {
+ setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates);
+ }
+
+ if (hasCmHandleStateChanged) {
+ setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandlePropertyUpdates);
+ }
+
+ if (!arePublicCmHandlePropertiesEqual) {
+ setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle,
+ cmHandlePropertyUpdates);
+ }
+
+ return cmHandlePropertyUpdates;
+
+ }
+
+ private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+ cmHandlePropertyUpdates.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle));
+ cmHandlePropertyUpdates.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle));
+
+ }
+
+ private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+ cmHandlePropertyUpdates.getOldValues()
+ .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle));
+ cmHandlePropertyUpdates.getNewValues()
+ .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle));
+ }
+
+ private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle,
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates) {
+
+ final Map<String, Map<String, String>> publicCmHandlePropertiesDifference =
+ getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(),
+ targetNcmpServiceCmHandle.getPublicProperties()
+ );
+ cmHandlePropertyUpdates.getOldValues()
+ .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues")));
+ cmHandlePropertyUpdates.getNewValues()
+ .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues")));
+
+ }
+
+ private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState(
+ final NcmpServiceCmHandle ncmpServiceCmHandle) {
+ return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name());
+ }
+
+ private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) {
+ return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+ }
+
+ private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+ final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
+
+ if (targetDataSyncFlag == null) {
+ return currentDataSyncFlag != null;
+ }
+
+ return !targetDataSyncFlag.equals(currentDataSyncFlag);
+ }
+
+ private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState()
+ != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState();
+ }
+
+ private static boolean arePublicCmHandlePropertiesEqual(final Map<String, String> currentCmHandleProperties,
+ final Map<String, String> targetCmHandleProperties) {
+ if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) {
+ return false;
+ }
+ return targetCmHandleProperties.equals(currentCmHandleProperties);
+ }
+
+ private static Map<String, Map<String, String>> getPublicCmHandlePropertiesDifference(
+ final Map<String, String> currentCmHandleProperties,
+ final Map<String, String> targetCmHandleProperties) {
+ final Map<String, Map<String, String>> oldAndNewPropertiesDifferenceMap = new HashMap<>(2);
+
+ final MapDifference<String, String> cmHandlePropertiesDifference =
+ Maps.difference(targetCmHandleProperties, currentCmHandleProperties);
+
+ final Map<String, String> oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight());
+ final Map<String, String> newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft());
+
+ cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> {
+ oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName));
+ newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName));
+ });
+
+ oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues);
+ oldAndNewPropertiesDifferenceMap.put("newValues", newValues);
+
+ return oldAndNewPropertiesDifferenceMap;
+ }
+
+}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-import org.mapstruct.Mapper;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-
-@Mapper(componentModel = "spring")
-public interface LcmEventHeaderMapper {
-
- /**
- * Mapper for converting incoming {@link LcmEvent} to outgoing {@link LcmEventHeader}.
- */
-
- LcmEventHeader toLcmEventHeader(LcmEvent lcmEvent);
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
+@NoArgsConstructor
+@Getter
+@Setter
+class CmHandlePropertyUpdates {
+ private Values oldValues;
+ private Values newValues;
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE;
+import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE;
+
+import java.util.UUID;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.Event;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter;
+import org.springframework.stereotype.Service;
+
+/**
+ * LcmEventObjectCreator to create the events send by LcmEventProducer.
+ */
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class LcmEventObjectCreator {
+
+ /**
+ * Create Lifecycle Management Event.
+ *
+ * @param currentNcmpServiceCmHandle current ncmp service cmhandle
+ * @param targetNcmpServiceCmHandle target ncmp service cmhandle
+ * @return Populated LcmEvent
+ */
+ public LcmEvent createLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
+ final LcmEventType lcmEventType =
+ determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+ final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
+ final Event event = new Event();
+ event.setCmHandleId(cmHandleId);
+ event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId());
+ event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag());
+ event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier());
+ final CmHandlePropertyUpdates cmHandlePropertyUpdates =
+ determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
+ event.setOldValues(cmHandlePropertyUpdates.getOldValues());
+ event.setNewValues(cmHandlePropertyUpdates.getNewValues());
+ lcmEvent.setEvent(event);
+ return lcmEvent;
+ }
+
+ private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+
+ if (currentNcmpServiceCmHandle.getCompositeState() == null) {
+ return CREATE;
+ } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) {
+ return DELETE;
+ }
+ return UPDATE;
+ }
+
+ private static CmHandlePropertyUpdates determineEventValues(final LcmEventType lcmEventType,
+ final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ if (CREATE == lcmEventType) {
+ return CmHandlePropertyChangeDetector.determineUpdatesForCreate(targetNcmpServiceCmHandle);
+ }
+ if (UPDATE == lcmEventType) {
+ return CmHandlePropertyChangeDetector.determineUpdates(currentNcmpServiceCmHandle,
+ targetNcmpServiceCmHandle);
+ }
+ return new CmHandlePropertyUpdates();
+ }
+
+ private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) {
+ final LcmEvent lcmEvent = new LcmEvent();
+ lcmEvent.setEventId(UUID.randomUUID().toString());
+ lcmEvent.setEventCorrelationId(eventCorrelationId);
+ lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime());
+ lcmEvent.setEventSource("org.onap.ncmp");
+ lcmEvent.setEventType(lcmEventType.getEventType());
+ lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event");
+ lcmEvent.setEventSchemaVersion("1.0");
+ return lcmEvent;
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.Timer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.events.EventProducer;
+import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
+import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
+import org.onap.cps.ncmp.events.lcm.v1.Values;
+import org.onap.cps.ncmp.impl.utils.YangDataConverter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.KafkaException;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+/**
+ * Producer service for sending Lifecycle Management (LCM) events.
+ * This service is responsible for creating and publishing LCM events to Kafka topics
+ * when CM handle state transitions occur. It supports asynchronous batch processing
+ * and includes metrics collection for monitoring event publishing performance.
+ */
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class LcmEventProducer {
+
+ private static final Tag METRIC_TAG_METHOD = Tag.of("method", "sendLcmEvent");
+ private static final Tag METRIC_TAG_CLASS = Tag.of("class", LcmEventProducer.class.getName());
+ private final EventProducer eventProducer;
+ private final LcmEventObjectCreator lcmEventObjectCreator;
+ private final MeterRegistry meterRegistry;
+
+ @Value("${app.lcm.events.topic:ncmp-events}")
+ private String topicName;
+
+ @Value("${notification.enabled:true}")
+ private boolean notificationsEnabled;
+
+ /**
+ * Sends LCM events in batches asynchronously for CM handle state transitions.
+ * This method processes a collection of CM handle transition pairs and sends
+ * corresponding LCM events to the configured Kafka topic. The processing is
+ * performed asynchronously using the "notificationExecutor" thread pool.
+ *
+ * @param cmHandleTransitionPairs Collection of pairs containing current and target
+ * CM handle states represented as YangModelCmHandle objects
+ */
+ @Async("notificationExecutor")
+ public void sendLcmEventBatchAsynchronously(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) {
+ cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent(
+ YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()),
+ YangDataConverter.toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle())
+ ));
+ }
+
+ /**
+ * Sends a single LCM event for a CM handle state transition.
+ * Creates an LCM event using the provided current and target CM handle states,
+ * publishes it to the configured Kafka topic, and records metrics for monitoring.
+ * Event publishing is conditional based on the notifications enabled configuration.
+ *
+ * @param currentNcmpServiceCmHandle The current state of the CM handle
+ * @param targetNcmpServiceCmHandle The target state of the CM handle
+ */
+ private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
+ final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
+ if (notificationsEnabled) {
+ final LcmEvent lcmEvent = lcmEventObjectCreator.createLcmEvent(currentNcmpServiceCmHandle,
+ targetNcmpServiceCmHandle);
+ final Timer.Sample timerSample = Timer.start(meterRegistry);
+ try {
+ final Map<String, Object> headersAsMap = extractHeadersAsMap(lcmEvent);
+ final String eventKey = currentNcmpServiceCmHandle.getCmHandleId();
+ eventProducer.sendLegacyEvent(topicName, eventKey, headersAsMap, lcmEvent);
+ recordMetrics(lcmEvent, timerSample);
+ } catch (final KafkaException e) {
+ log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
+ }
+ } else {
+ log.debug("Notifications disabled.");
+ }
+ }
+
+ private Map<String, Object> extractHeadersAsMap(final LcmEvent lcmEvent) {
+ final Map<String, Object> headersAsMap = new HashMap<>(7);
+ headersAsMap.put("eventId", lcmEvent.getEventId());
+ headersAsMap.put("eventCorrelationId", lcmEvent.getEventCorrelationId());
+ headersAsMap.put("eventTime", lcmEvent.getEventTime());
+ headersAsMap.put("eventSource", lcmEvent.getEventSource());
+ headersAsMap.put("eventType", lcmEvent.getEventType());
+ headersAsMap.put("eventSchema", lcmEvent.getEventSchema());
+ headersAsMap.put("eventSchemaVersion", lcmEvent.getEventSchemaVersion());
+ return headersAsMap;
+ }
+
+ private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) {
+ final List<Tag> tags = new ArrayList<>(4);
+ tags.add(METRIC_TAG_CLASS);
+ tags.add(METRIC_TAG_METHOD);
+ tags.add(createCmHandleStateTag("oldCmHandleState", lcmEvent.getEvent().getOldValues()));
+ tags.add(createCmHandleStateTag("newCmHandleState", lcmEvent.getEvent().getNewValues()));
+ timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send")
+ .description("Time taken to send a LCM event")
+ .tags(tags)
+ .register(meterRegistry));
+ }
+
+ private Tag createCmHandleStateTag(final String tageLabel, final Values values) {
+ return Tag.of(tageLabel, values.getCmHandleState().value());
+ }
+
+}
public class LcmEventsCmHandleStateHandlerImpl implements LcmEventsCmHandleStateHandler {
private final InventoryPersistence inventoryPersistence;
- private final LcmEventsHelper lcmEventsHelper;
+ private final LcmEventProducer lcmEventProducer;
private final CmHandleStateMonitor cmHandleStateMonitor;
@Override
final Collection<CmHandleTransitionPair> cmHandleTransitionPairs =
prepareCmHandleTransitionBatch(targetCmHandleStatePerCmHandle);
persistCmHandleBatch(cmHandleTransitionPairs);
- lcmEventsHelper.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs);
- cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs);
+ if (!cmHandleTransitionPairs.isEmpty()) {
+ lcmEventProducer.sendLcmEventBatchAsynchronously(cmHandleTransitionPairs);
+ cmHandleStateMonitor.updateCmHandleStateMetrics(cmHandleTransitionPairs);
+ }
}
@Override
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import java.util.Collection;
-import lombok.RequiredArgsConstructor;
-import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
-import org.onap.cps.ncmp.impl.utils.YangDataConverter;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Service;
-
-@Service
-@RequiredArgsConstructor
-public class LcmEventsHelper {
-
- private final LcmEventsProducerHelper lcmEventsProducerHelper;
- private final LcmEventsProducer lcmEventsProducer;
-
- /**
- * Send LcmEvent in batches and in asynchronous manner.
- *
- * @param cmHandleTransitionPairs Pair of existing and modified cm handle represented as YangModelCmHandle
- */
- @Async("notificationExecutor")
- public void sendLcmEventBatchAsynchronously(final Collection<CmHandleTransitionPair> cmHandleTransitionPairs) {
- cmHandleTransitionPairs.forEach(cmHandleTransitionPair -> sendLcmEvent(
- toNcmpServiceCmHandle(cmHandleTransitionPair.currentYangModelCmHandle()),
- toNcmpServiceCmHandle(cmHandleTransitionPair.targetYangModelCmHandle())
- ));
- }
-
- private void sendLcmEvent(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- final String cmHandleId = targetNcmpServiceCmHandle.getCmHandleId();
- final LcmEventHeader lcmEventHeader =
- lcmEventsProducerHelper.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle,
- targetNcmpServiceCmHandle
- );
- final LcmEvent lcmEvent =
- lcmEventsProducerHelper.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle,
- targetNcmpServiceCmHandle
- );
- lcmEventsProducer.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader);
- }
-
- private static NcmpServiceCmHandle toNcmpServiceCmHandle(final YangModelCmHandle yangModelCmHandle) {
- return YangDataConverter.toNcmpServiceCmHandle(yangModelCmHandle);
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.Timer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.events.EventsProducer;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.events.lcm.v1.Values;
-import org.onap.cps.utils.JsonObjectMapper;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.kafka.KafkaException;
-import org.springframework.stereotype.Service;
-
-/**
- * LcmEventsProducer to call the producer and send on the dedicated topic.
- */
-
-@Slf4j
-@Service
-@RequiredArgsConstructor
-public class LcmEventsProducer {
-
- private static final Tag TAG_METHOD = Tag.of("method", "sendLcmEvent");
- private static final Tag TAG_CLASS = Tag.of("class", LcmEventsProducer.class.getName());
- private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
- private final EventsProducer eventsProducer;
- private final JsonObjectMapper jsonObjectMapper;
- private final MeterRegistry meterRegistry;
-
- @Value("${app.lcm.events.topic:ncmp-events}")
- private String topicName;
-
- @Value("${notification.enabled:true}")
- private boolean notificationsEnabled;
-
- /**
- * Sends an LCM event to the dedicated topic with optional notification headers.
- * Capture and log KafkaException If an error occurs while sending the event to Kafka
- *
- * @param cmHandleId CM handle id associated with the LCM event
- * @param lcmEvent The LCM event object to be sent
- * @param lcmEventHeader Optional headers associated with the LCM event
- */
- public void sendLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {
- if (notificationsEnabled) {
- lcmEventHeader.setEventId(lcmEvent.getEventId());
- lcmEventHeader.setEventTime(lcmEvent.getEventTime());
- final Timer.Sample timerSample = Timer.start(meterRegistry);
- try {
- @SuppressWarnings("unchecked")
- final Map<String, Object> lcmEventHeadersMap =
- jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
- eventsProducer.sendLegacyEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
- } catch (final KafkaException e) {
- log.error("Unable to send message to topic : {} and cause : {}", topicName, e.getMessage());
- } finally {
- recordMetrics(lcmEvent, timerSample);
- }
- } else {
- log.debug("Notifications disabled.");
- }
- }
-
- private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) {
- final List<Tag> tags = new ArrayList<>(4);
- tags.add(TAG_CLASS);
- tags.add(TAG_METHOD);
-
- final String oldCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getOldValues());
- tags.add(Tag.of("oldCmHandleState", oldCmHandleState));
-
- final String newCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getNewValues());
- tags.add(Tag.of("newCmHandleState", newCmHandleState));
-
- timerSample.stop(Timer.builder("cps.ncmp.lcm.events.send")
- .description("Time taken to send a LCM event")
- .tags(tags)
- .register(meterRegistry));
- }
-
- /**
- * Extracts the CM handle state value from the given Values object.
- * If the provided Values object or its CM handle state is null, returns a default value.
- *
- * @param values The Values object containing CM handle state information.
- * @return The CM handle state value as a string, or a default value if null.
- */
- private String extractCmHandleStateValue(final Values values) {
- return (values != null && values.getCmHandleState() != null)
- ? values.getCmHandleState().value()
- : UNAVAILABLE_CM_HANDLE_STATE;
- }
-}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm;
-
-import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETED;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.CREATE;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.DELETE;
-import static org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventType.UPDATE;
-
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
-import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle;
-import org.onap.cps.ncmp.events.lcm.v1.Event;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
-import org.onap.cps.ncmp.events.lcm.v1.Values;
-import org.onap.cps.ncmp.impl.utils.EventDateTimeFormatter;
-import org.springframework.stereotype.Component;
-
-/**
- * LcmEventsProducerHelper to create LcmEvent based on relevant operation.
- */
-@Slf4j
-@Component
-@RequiredArgsConstructor
-public class LcmEventsProducerHelper {
-
- private final LcmEventHeaderMapper lcmEventHeaderMapper;
-
- /**
- * Create Lifecycle Management Event.
- *
- * @param cmHandleId cm handle identifier
- * @param currentNcmpServiceCmHandle current ncmp service cmhandle
- * @param targetNcmpServiceCmHandle target ncmp service cmhandle
- * @return Populated LcmEvent
- */
- public LcmEvent createLcmEvent(final String cmHandleId,
- final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- final LcmEventType lcmEventType =
- determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
- final LcmEvent lcmEvent = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
- final Event event = new Event();
- event.setCmHandleId(cmHandleId);
- event.setAlternateId(targetNcmpServiceCmHandle.getAlternateId());
- event.setModuleSetTag(targetNcmpServiceCmHandle.getModuleSetTag());
- event.setDataProducerIdentifier(targetNcmpServiceCmHandle.getDataProducerIdentifier());
- final CmHandleValuesHolder cmHandleValuesHolder =
- determineEventValues(lcmEventType, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
- event.setOldValues(cmHandleValuesHolder.getOldValues());
- event.setNewValues(cmHandleValuesHolder.getNewValues());
- lcmEvent.setEvent(event);
- return lcmEvent;
- }
-
- /**
- * Create Lifecycle Management Event Header.
- *
- * @param cmHandleId cm handle identifier
- * @param currentNcmpServiceCmHandle current ncmp service cmhandle
- * @param targetNcmpServiceCmHandle target ncmp service cmhandle
- * @return Populated LcmEventHeader
- */
- public LcmEventHeader createLcmEventHeader(final String cmHandleId,
- final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- final LcmEventType lcmEventType =
- determineEventType(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
- final LcmEvent lcmEventWithHeaderDetails = createLcmEventWithHeaderDetails(cmHandleId, lcmEventType);
- return lcmEventHeaderMapper.toLcmEventHeader(lcmEventWithHeaderDetails);
- }
-
- private static LcmEventType determineEventType(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
-
- if (currentNcmpServiceCmHandle.getCompositeState() == null) {
- return CREATE;
- } else if (targetNcmpServiceCmHandle.getCompositeState().getCmHandleState() == DELETED) {
- return DELETE;
- }
- return UPDATE;
- }
-
- private static CmHandleValuesHolder determineEventValues(final LcmEventType lcmEventType,
- final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- if (CREATE == lcmEventType) {
- return determineCreateEventValues(targetNcmpServiceCmHandle);
- } else if (UPDATE == lcmEventType) {
- return determineUpdateEventValues(targetNcmpServiceCmHandle, currentNcmpServiceCmHandle);
- }
- return new CmHandleValuesHolder();
-
- }
-
- private LcmEvent createLcmEventWithHeaderDetails(final String eventCorrelationId, final LcmEventType lcmEventType) {
- final LcmEvent lcmEvent = new LcmEvent();
- lcmEvent.setEventId(UUID.randomUUID().toString());
- lcmEvent.setEventCorrelationId(eventCorrelationId);
- lcmEvent.setEventTime(EventDateTimeFormatter.getCurrentIsoFormattedDateTime());
- lcmEvent.setEventSource("org.onap.ncmp");
- lcmEvent.setEventType(lcmEventType.getEventType());
- lcmEvent.setEventSchema("org.onap.ncmp:cmhandle-lcm-event");
- lcmEvent.setEventSchemaVersion("1.0");
- return lcmEvent;
- }
-
-
- private static CmHandleValuesHolder determineCreateEventValues(final NcmpServiceCmHandle ncmpServiceCmHandle) {
- final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder();
- cmHandleValuesHolder.setNewValues(new Values());
- cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(ncmpServiceCmHandle));
- cmHandleValuesHolder.getNewValues()
- .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(ncmpServiceCmHandle));
- cmHandleValuesHolder.getNewValues().setCmHandleProperties(List.of(ncmpServiceCmHandle.getPublicProperties()));
- return cmHandleValuesHolder;
- }
-
- private static CmHandleValuesHolder determineUpdateEventValues(final NcmpServiceCmHandle targetNcmpServiceCmHandle,
- final NcmpServiceCmHandle currentNcmpServiceCmHandle) {
- final boolean hasDataSyncFlagEnabledChanged =
- hasDataSyncEnabledFlagChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
- final boolean hasCmHandleStateChanged =
- hasCmHandleStateChanged(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle);
- final boolean arePublicCmHandlePropertiesEqual =
- arePublicCmHandlePropertiesEqual(currentNcmpServiceCmHandle.getPublicProperties(),
- targetNcmpServiceCmHandle.getPublicProperties()
- );
-
- final CmHandleValuesHolder cmHandleValuesHolder = new CmHandleValuesHolder();
-
- if (hasDataSyncFlagEnabledChanged || hasCmHandleStateChanged || (!arePublicCmHandlePropertiesEqual)) {
- cmHandleValuesHolder.setOldValues(new Values());
- cmHandleValuesHolder.setNewValues(new Values());
- } else {
- return cmHandleValuesHolder;
- }
-
- if (hasDataSyncFlagEnabledChanged) {
- setDataSyncEnabledFlag(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder);
- }
-
- if (hasCmHandleStateChanged) {
- setCmHandleStateChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle, cmHandleValuesHolder);
- }
-
- if (!arePublicCmHandlePropertiesEqual) {
- setPublicCmHandlePropertiesChange(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle,
- cmHandleValuesHolder);
- }
-
- return cmHandleValuesHolder;
-
- }
-
- private static void setDataSyncEnabledFlag(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle,
- final CmHandleValuesHolder cmHandleValuesHolder) {
- cmHandleValuesHolder.getOldValues().setDataSyncEnabled(getDataSyncEnabledFlag(currentNcmpServiceCmHandle));
- cmHandleValuesHolder.getNewValues().setDataSyncEnabled(getDataSyncEnabledFlag(targetNcmpServiceCmHandle));
-
- }
-
- private static void setCmHandleStateChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle,
- final CmHandleValuesHolder cmHandleValuesHolder) {
- cmHandleValuesHolder.getOldValues()
- .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(currentNcmpServiceCmHandle));
- cmHandleValuesHolder.getNewValues()
- .setCmHandleState(mapCmHandleStateToLcmEventCmHandleState(targetNcmpServiceCmHandle));
- }
-
- private static void setPublicCmHandlePropertiesChange(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle,
- final CmHandleValuesHolder cmHandleValuesHolder) {
-
- final Map<String, Map<String, String>> publicCmHandlePropertiesDifference =
- getPublicCmHandlePropertiesDifference(currentNcmpServiceCmHandle.getPublicProperties(),
- targetNcmpServiceCmHandle.getPublicProperties()
- );
- cmHandleValuesHolder.getOldValues()
- .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("oldValues")));
- cmHandleValuesHolder.getNewValues()
- .setCmHandleProperties(List.of(publicCmHandlePropertiesDifference.get("newValues")));
-
- }
-
- private static Values.CmHandleState mapCmHandleStateToLcmEventCmHandleState(
- final NcmpServiceCmHandle ncmpServiceCmHandle) {
- return Values.CmHandleState.fromValue(ncmpServiceCmHandle.getCompositeState().getCmHandleState().name());
- }
-
- private static Boolean getDataSyncEnabledFlag(final NcmpServiceCmHandle ncmpServiceCmHandle) {
- return ncmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
- }
-
- private static boolean hasDataSyncEnabledFlagChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- final Boolean currentDataSyncFlag = currentNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
- final Boolean targetDataSyncFlag = targetNcmpServiceCmHandle.getCompositeState().getDataSyncEnabled();
-
- if (targetDataSyncFlag == null) {
- return currentDataSyncFlag != null;
- }
-
- return !targetDataSyncFlag.equals(currentDataSyncFlag);
- }
-
- private static boolean hasCmHandleStateChanged(final NcmpServiceCmHandle currentNcmpServiceCmHandle,
- final NcmpServiceCmHandle targetNcmpServiceCmHandle) {
- return targetNcmpServiceCmHandle.getCompositeState().getCmHandleState()
- != currentNcmpServiceCmHandle.getCompositeState().getCmHandleState();
- }
-
- private static boolean arePublicCmHandlePropertiesEqual(final Map<String, String> currentCmHandleProperties,
- final Map<String, String> targetCmHandleProperties) {
- if (targetCmHandleProperties.size() != currentCmHandleProperties.size()) {
- return false;
- }
- return targetCmHandleProperties.equals(currentCmHandleProperties);
- }
-
- private static Map<String, Map<String, String>> getPublicCmHandlePropertiesDifference(
- final Map<String, String> currentCmHandleProperties,
- final Map<String, String> targetCmHandleProperties) {
- final Map<String, Map<String, String>> oldAndNewPropertiesDifferenceMap = new HashMap<>(2);
-
- final MapDifference<String, String> cmHandlePropertiesDifference =
- Maps.difference(targetCmHandleProperties, currentCmHandleProperties);
-
- final Map<String, String> oldValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnRight());
- final Map<String, String> newValues = new HashMap<>(cmHandlePropertiesDifference.entriesOnlyOnLeft());
-
- cmHandlePropertiesDifference.entriesDiffering().keySet().forEach(cmHandlePropertyName -> {
- oldValues.put(cmHandlePropertyName, currentCmHandleProperties.get(cmHandlePropertyName));
- newValues.put(cmHandlePropertyName, targetCmHandleProperties.get(cmHandlePropertyName));
- });
-
- oldAndNewPropertiesDifferenceMap.put("oldValues", oldValues);
- oldAndNewPropertiesDifferenceMap.put("newValues", newValues);
-
- return oldAndNewPropertiesDifferenceMap;
- }
-
- @NoArgsConstructor
- @Getter
- @Setter
- static class CmHandleValuesHolder {
- private Values oldValues;
- private Values newValues;
- }
-}
import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
-import org.onap.cps.events.EventsProducer;
+import org.onap.cps.events.EventProducer;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent;
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Data;
@RequiredArgsConstructor
public class InventoryEventProducer {
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
@Value("${app.ncmp.avc.inventory-events-topic}")
private String ncmpInventoryEventsTopicName;
.build()
.asCloudEvent();
- eventsProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
+ eventProducer.sendCloudEvent(ncmpInventoryEventsTopicName, eventKey, avcCloudEvent);
}
private AvcEvent buildAvcEvent(final String attributeName,
extensions.put("correlationid", eventKey);
return extensions;
}
-}
\ No newline at end of file
+}
package org.onap.cps.ncmp.impl.data
import com.fasterxml.jackson.databind.ObjectMapper
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.api.data.models.CmResourceAddress
import org.onap.cps.ncmp.api.data.models.DataOperationRequest
import org.onap.cps.ncmp.api.exceptions.CmHandleNotFoundException
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
-import org.springframework.test.context.ContextConfiguration
import reactor.core.publisher.Mono
import static org.onap.cps.ncmp.api.NcmpResponseStatus.UNKNOWN_ERROR
import static org.onap.cps.ncmp.impl.models.RequiredDmiService.DATA
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest
-@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor])
+@SpringBootTest(classes = [EventProducer, CpsApplicationContext, DmiServiceAuthenticationProperties, DmiDataOperations, PolicyExecutor])
class DmiDataOperationsSpec extends DmiOperationsBaseSpec {
def NO_TOPIC = null
DmiDataOperations objectUnderTest
@SpringBean
- EventsProducer eventsProducer = Stub()
+ EventProducer eventProducerStub = Stub()
@SpringBean
- PolicyExecutor policyExecutor = Mock()
+ PolicyExecutor mockPolicyExecutor = Mock()
@SpringBean
- AlternateIdMatcher alternateIdMatcher = Mock()
+ AlternateIdMatcher mockAlternateIdMatcher = Mock()
def 'Get resource data for #expectedDataStore from DMI without topic #scenario.'() {
given: 'a cm handle for #cmHandleId'
- alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
+ mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
mockYangModelCmHandleRetrieval(additionalProperties)
and: 'a positive response from DMI service when it is called with the expected parameters'
def responseFromDmi = Mono.just(new ResponseEntity<Object>('{some-key:some-value}', HttpStatus.OK))
def dataOperationBatchRequestJsonData = TestUtils.getResourceFileContent('dataOperationRequest.json')
def dataOperationRequest = spiedJsonObjectMapper.convertJsonString(dataOperationBatchRequestJsonData, DataOperationRequest.class)
and: 'no valid cm handles are found for the request'
- alternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') }
+ mockAlternateIdMatcher.getCmHandleId(_) >> { throw new CmHandleNotFoundException('') }
mockInventoryPersistence.getYangModelCmHandles(_) >> []
when: 'get resource data for group of cm handles is invoked'
objectUnderTest.requestResourceDataFromDmi('my-topic-name', dataOperationRequest, 'requestId', NO_AUTH_HEADER)
dataOperationRequest.dataOperationDefinitions[0].cmHandleReferences = [cmHandleId]
and: 'the sent cloud event will be captured'
def actualDataOperationCloudEvent = null
- eventsProducer.sendCloudEvent('my-topic-name', 'my-request-id', _) >> { args -> actualDataOperationCloudEvent = args[2] }
+ eventProducerStub.sendCloudEvent('my-topic-name', 'my-request-id', _) >> {args -> actualDataOperationCloudEvent = args[2] }
and: 'a DMI client request exception is thrown when DMI service is called'
mockDmiRestClient.asynchronousPostOperation(*_) >> { Mono.error(new DmiClientRequestException(123, '', '', UNKNOWN_ERROR)) }
when: 'attempt to get resource data for group of cm handles is invoked'
def 'Write data for pass-through:running datastore in DMI.'() {
given: 'a cm handle for #cmHandleId'
mockYangModelCmHandleRetrieval([yangModelCmHandleProperty])
- alternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
+ mockAlternateIdMatcher.getCmHandleId(cmHandleId) >> cmHandleId
and: 'a positive response from DMI service when it is called with the expected parameters'
def expectedUrlTemplateParameters = new UrlTemplateParameters('myServiceName/dmi/v1/ch/{cmHandleId}/data/ds/{datastore}?resourceIdentifier={resourceIdentifier}', ['resourceIdentifier': resourceIdentifier, 'datastore': 'ncmp-datastore:passthrough-running', 'cmHandleId': cmHandleId])
def expectedJson = '{"operation":"' + expectedOperationInUrl + '","dataType":"some data type","data":"requestData","cmHandleProperties":{"prop1":"val1"},"moduleSetTag":""}'
then: 'the result is the response from the DMI service'
assert result == responseFromDmi
and: 'the permission was checked with the policy executor'
- 1 * policyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' )
+ 1 * mockPolicyExecutor.checkPermission(_, operation, NO_AUTH_HEADER, resourceIdentifier, 'requestData' )
where: 'the following operation is performed'
operation || expectedOperationInUrl
CREATE || 'create'
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.mapstruct.factory.Mappers
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.testcontainers.spock.Testcontainers
import java.time.Duration
-@SpringBootTest(classes = [EventsProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventProducer, DmiAsyncRequestResponseEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
class NcmpAsyncRequestResponseEventProducerIntegrationSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer cpsAsyncRequestResponseEventProducer =
- new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer cpsAsyncRequestResponseEventProducer =
+ new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@SpringBean
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.header.internals.RecordHeaders
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.events.async1_0_0.DataOperationEvent
import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest(classes = [EventsProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
+@SpringBootTest(classes = [EventProducer, DataOperationEventConsumer, RecordFilterStrategies, JsonObjectMapper, ObjectMapper])
@Testcontainers
@DirtiesContext
class DataOperationEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer asyncDataOperationEventProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer asyncDataOperationEventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@SpringBean
DataOperationEventConsumer objectUnderTest = new DataOperationEventConsumer(asyncDataOperationEventProducer)
package org.onap.cps.ncmp.impl.data.async
import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.config.KafkaConfig
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.utils.events.ConsumerBaseSpec
class FilterStrategiesIntegrationSpec extends ConsumerBaseSpec {
@SpringBean
- EventsProducer mockEventsProducer = Mock()
+ EventProducer mockEventProducer = Mock()
@SpringBean
NcmpAsyncRequestResponseEventMapper mapper = Stub()
then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
and: 'event is not consumed'
- 0 * mockEventsProducer.sendLegacyEvent(*_)
+ 0 * mockEventProducer.sendLegacyEvent(*_)
}
def 'Legacy event consumer with valid legacy event.'() {
and: 'a flag to track the send event call'
def sendEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called'
- mockEventsProducer.sendLegacyEvent(*_) >> {
+ mockEventProducer.sendLegacyEvent(*_) >> {
sendEventMethodCalled = true
}
when: 'send the cloud event'
and: 'a flag to track the sent event call'
def sendEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called'
- mockEventsProducer.sendCloudEvent(*_) >> {
+ mockEventProducer.sendCloudEvent(*_) >> {
sendEventMethodCalled = true
}
when: 'send the cloud event'
then: 'wait a little for async processing of message (must wait to try to avoid false positives)'
TimeUnit.MILLISECONDS.sleep(300)
and: 'the event is not processed by this consumer'
- 0 * mockEventsProducer.sendCloudEvent(*_)
+ 0 * mockEventProducer.sendCloudEvent(*_)
}
}
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.core.builder.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.config.KafkaConfig
import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent
import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent
class SerializationIntegrationSpec extends ConsumerBaseSpec {
@SpringBean
- EventsProducer mockEventsProducer = Mock()
+ EventProducer mockEventProducer = Mock()
@SpringBean
NcmpAsyncRequestResponseEventMapper mapper = Stub() { toNcmpAsyncEvent(_) >> new NcmpAsyncRequestResponseEvent(eventId: 'my-event-id', eventTarget: 'some client topic')}
and: 'a flag to track the send cloud event call'
def sendCloudEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the cloud event'
- mockEventsProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
+ mockEventProducer.sendCloudEvent('some client topic', 'some-correlation-id', cloudEvent) >> {
sendCloudEventMethodCalled = true
}
when: 'send the event'
and: 'a flag to track the send event call'
def sendEventMethodCalled = false
and: 'the (mocked) events producer will use the flag to indicate if it is called and will capture the event'
- mockEventsProducer.sendLegacyEvent(*_) >> {
+ mockEventProducer.sendLegacyEvent(*_) >> {
sendEventMethodCalled = true
}
when: 'send the event'
package org.onap.cps.ncmp.impl.data.utils
import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
import io.cloudevents.kafka.CloudEventDeserializer
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.api.data.models.DataOperationRequest
import org.onap.cps.ncmp.api.data.models.OperationType
import org.onap.cps.ncmp.api.inventory.models.CompositeStateBuilder
import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@ContextConfiguration(classes = [EventsProducer, CpsApplicationContext])
+@ContextConfiguration(classes = [EventProducer, CpsApplicationContext])
class DmiDataOperationsHelperSpec extends MessagingBaseSpec {
def static clientTopic = 'my-topic-name'
JsonObjectMapper jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
def 'Process per data operation request with #serviceName.'() {
given: 'data operation request with 3 operations'
import io.cloudevents.kafka.impl.KafkaHeaders
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.events.avc1_0_0.AvcEvent
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import static org.onap.cps.ncmp.utils.TestUtils.getResourceFileContent
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent
-@SpringBootTest(classes = [EventsProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
+@SpringBootTest(classes = [EventProducer, CmAvcEventConsumer, ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
class CmAvcEventConsumerSpec extends MessagingBaseSpec {
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventsProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
def mockCmAvcEventService = Mock(CmAvcEventService)
def mockInventoryPersistence = Mock(InventoryPersistence)
import com.fasterxml.jackson.databind.ObjectMapper
import io.cloudevents.CloudEvent
import io.cloudevents.core.v1.CloudEventBuilder
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.CmHandle
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.Data
import org.onap.cps.ncmp.utils.events.CloudEventMapper
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.boot.test.context.SpringBootTest
-import org.springframework.test.context.ContextConfiguration
import spock.lang.Specification
-@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper, CloudEventBuilder])
-@ContextConfiguration(classes = [CpsApplicationContext])
+@SpringBootTest(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper, CloudEventBuilder])
class EventProducerSpec extends Specification {
- def mockEventsProducer = Mock(EventsProducer)
+ def mockEventProducer = Mock(EventProducer)
- def objectUnderTest = new EventProducer(mockEventsProducer)
+ def objectUnderTest = new DmiEventProducer(mockEventProducer)
def 'Create and Send Cm Notification Subscription DMI In Event'() {
given: 'a cm subscription for a dmi plugin'
when: 'the event is sent'
objectUnderTest.send(subscriptionId, dmiPluginName, eventType, dmiInEvent)
then: 'the event contains the required attributes'
- 1 * mockEventsProducer.sendCloudEvent(_, _, _) >> {
+ 1 * mockEventProducer.sendCloudEvent(_, _, _) >> {
args ->
{
assert args[0] == 'dmiplugin-test-topic'
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
-
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
+import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiEventProducer
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
-import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp_to_dmi.DataJobSubscriptionDmiInEvent
import org.onap.cps.ncmp.impl.datajobs.subscription.utils.CmDataJobSubscriptionPersistenceService
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence
import org.onap.cps.ncmp.impl.utils.JexParser
import spock.lang.Specification
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
+
class CmSubscriptionHandlerImplSpec extends Specification {
def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def mockDmiInEventMapper = Mock(DmiInEventMapper)
- def mockDmiInEventProducer = Mock(EventProducer)
+ def mockDmiInEventProducer = Mock(DmiEventProducer)
def mockInventoryPersistence = Mock(InventoryPersistence)
def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
def getFdn(dataNodeSelector) {
return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
}
-}
\ No newline at end of file
+}
import org.onap.cps.impl.DataNodeBuilder
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
-import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsHelper
-import org.onap.cps.utils.ContentType
+import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventProducer
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
import spock.lang.Specification
def jsonObjectMapper = new JsonObjectMapper(new ObjectMapper())
def mockAlternateIdChecker = Mock(AlternateIdChecker)
def mockCmHandleIdPerAlternateId = Mock(IMap)
- def mockLcmEventsHelper = Mock(LcmEventsHelper)
+ def mockLcmEventProducer = Mock(LcmEventProducer)
- def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventsHelper)
+ def objectUnderTest = new CmHandleRegistrationServicePropertyHandler(mockInventoryPersistence, mockCpsDataService, jsonObjectMapper, mockAlternateIdChecker, mockCmHandleIdPerAlternateId, mockLcmEventProducer)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
then: 'the update node leaves method is invoked once with correct parameters'
1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'New Data Producer Identifier')
and: 'LCM event is sent'
- 1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously({ cmHandleTransitionPairs ->
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously({cmHandleTransitionPairs ->
assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'New Data Producer Identifier'
})
where: 'the following scenarios are used'
then: 'the update node leaves method is not invoked'
0 * mockCpsDataService.updateNodeLeaves(*_)
and: 'No LCM events are sent'
- 0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_)
+ 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
and: 'debug information is logged'
def loggingEvent = logger.list[0]
assert loggingEvent.level == Level.DEBUG
then: 'the update node leaves method is invoked once with correct parameters'
1 * mockInventoryPersistence.updateCmHandleField('cmHandleId', 'data-producer-identifier', 'newDataProducerIdentifier')
and: 'LCM event is sent'
- 1 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously( { cmHandleTransitionPairs ->
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously( {cmHandleTransitionPairs ->
assert cmHandleTransitionPairs[0].targetYangModelCmHandle.dataProducerIdentifier == 'newDataProducerIdentifier'
assert cmHandleTransitionPairs[0].currentYangModelCmHandle.dataProducerIdentifier == 'oldDataProducerIdentifier'
})
then: 'the update node leaves method is not invoked'
0 * mockCpsDataService.updateNodeLeaves(*_)
and: 'No LCM events are sent'
- 0 * mockLcmEventsHelper.sendLcmEventBatchAsynchronously(*_)
+ 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
and: 'warning is logged'
def lastLoggingEvent = logger.list[0]
assert lastLoggingEvent.level == Level.WARN
1 * mockCmHandlesByState.putIfAbsent("lockedCmHandlesCount", expectedValue)
1 * mockCmHandlesByState.putIfAbsent("deletingCmHandlesCount", expectedValue)
where:
- scenario | queryResult || expectedValue
- 'query service returns zero cm handle id'| [] || 0
- 'query service returns 1 cm handle id' | ['someId'] || 1
+ scenario | queryResult || expectedValue
+ 'query service returns zero cm handle id'| [] || 0
+ 'query service returns 1 cm handle id' | ['someId'] || 1
}
def 'Update cm handle state metric'() {
- given: 'a collection of cm handle state pair'
+ given: 'a cm handle state pair'
def cmHandleTransitionPair = new CmHandleTransitionPair(new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: ADVISED)),
new YangModelCmHandle(compositeState: new CompositeState(cmHandleState: READY))
)
then: 'the new value is as expected'
assert entryProcessingMap.get(key) == expectedValue
where: 'the following data is used'
- scenario | key || expectedValue
- 'current value of count is zero'| 'zeroCmHandlesCount'|| 0
- 'current value of count is >0' | 'tenCmHandlesCount' || 9
+ scenario | key || expectedValue
+ 'current value of count is zero'| 'zeroCmHandlesCount' || 0
+ 'current value of count is >0' | 'tenCmHandlesCount' || 9
}
def 'Applying increasing entry processor to a key on map'() {
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.events.LegacyEvent
import org.onap.cps.ncmp.events.lcm.v1.Event
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
@Testcontainers
@DirtiesContext
-class EventsProducerSpec extends MessagingBaseSpec {
+class EventProducerSpec extends MessagingBaseSpec {
def legacyEventKafkaConsumer = new KafkaConsumer<String, LegacyEvent>(eventConsumerConfigProperties('ncmp-group', StringDeserializer))
-
def testTopic = 'ncmp-events-test'
@SpringBean
- EventsProducer eventsProducer = new EventsProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
+ EventProducer eventProducer = new EventProducer(legacyEventKafkaTemplate, cloudEventKafkaTemplate, cloudEventKafkaTemplateForEos)
@Autowired
JsonObjectMapper jsonObjectMapper
-
def 'Produce and Consume Event'() {
given: 'event key and event data'
def eventKey = 'lcm'
and: 'consumer has a subscription'
legacyEventKafkaConsumer.subscribe([testTopic] as List<String>)
when: 'an event is sent'
- eventsProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
+ eventProducer.sendLegacyEvent(testTopic, eventKey, eventHeader, eventData)
and: 'topic is polled'
def records = legacyEventKafkaConsumer.poll(Duration.ofMillis(1500))
then: 'poll returns one record'
assert records.size() == 1
and: 'record key matches the expected event key'
- def record = records.iterator().next()
- assert eventKey == record.key
+ assert eventKey == records[0].key
and: 'record matches the expected event'
def expectedJsonString = TestUtils.getResourceFileContent('expectedLcmEvent.json')
def expectedLcmEvent = jsonObjectMapper.convertJsonString(expectedJsonString, LcmEvent.class)
- assert expectedLcmEvent == jsonObjectMapper.convertJsonString(record.value, LcmEvent.class)
+ assert expectedLcmEvent == jsonObjectMapper.convertJsonString(records[0].value, LcmEvent.class)
and: 'record header matches the expected parameters'
- assert SerializationUtils.deserialize(record.headers().lastHeader('eventId').value()) == eventId
- assert SerializationUtils.deserialize(record.headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
+ assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventId').value()) == eventId
+ assert SerializationUtils.deserialize(records[0].headers().lastHeader('eventCorrelationId').value()) == eventCorrelationId
}
}
package org.onap.cps.ncmp.impl.inventory.sync.lcm
-import org.mapstruct.factory.Mappers
import org.onap.cps.ncmp.api.inventory.models.CmHandleState
import org.onap.cps.ncmp.api.inventory.models.CompositeState
import org.onap.cps.ncmp.api.inventory.models.NcmpServiceCmHandle
import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.DELETING
import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
-class LcmEventsProducerHelperSpec extends Specification {
+class LcmEventObjectCreatorSpec extends Specification {
- LcmEventHeaderMapper lcmEventsHeaderMapper = Mappers.getMapper(LcmEventHeaderMapper)
-
- def objectUnderTest = new LcmEventsProducerHelper(lcmEventsHeaderMapper)
+ def objectUnderTest = new LcmEventObjectCreator()
def cmHandleId = 'test-cm-handle'
def 'Map the LcmEvent for #operation'() {
given: 'NCMP cm handle details with current and old properties'
- def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState),
- publicProperties: currentPublicProperties)
- def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState),
- publicProperties: targetPublicProperties)
+ def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: currentCmHandleState), publicProperties: currentPublicProperties)
+ def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: targetCmHandleState), publicProperties: targetPublicProperties)
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'event header is mapped correctly'
assert result.eventSource == 'org.onap.ncmp'
assert result.eventCorrelationId == cmHandleId
def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: READY),
publicProperties: publicProperties)
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'Properties are just the one which are same'
assert result.event.oldValues == null
assert result.event.newValues == null
publicProperties: ['publicProperty1': 'value11', 'publicProperty2': 'value22'])
def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, publicProperties: ['publicProperty1': 'value1', 'publicProperty2': 'value2'])
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmhandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmhandle)
then: 'event header is mapped correctly'
assert result.eventSource == 'org.onap.ncmp'
assert result.eventCorrelationId == cmHandleId
def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: true, cmHandleState: DELETING),
publicProperties: ['publicProperty1': 'value1'])
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'event header is mapped correctly'
assert result.eventSource == 'org.onap.ncmp'
assert result.eventCorrelationId == cmHandleId
def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED))
def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY))
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'event header is mapped correctly'
assert result.eventSource == 'org.onap.ncmp'
assert result.eventCorrelationId == cmHandleId
def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: currentDataSyncEnableFlag, cmHandleState: ADVISED))
def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(dataSyncEnabled: targetDataSyncEnableFlag, cmHandleState: READY))
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'the data sync flag is not present in the event'
assert result.event.oldValues.dataSyncEnabled == null
assert result.event.newValues.dataSyncEnabled == null
'null to null' | null | null
}
- def 'Map the LcmEventHeader'() {
- given: 'NCMP cm handle details with current and old details'
- def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: ADVISED))
- def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, compositeState: new CompositeState(cmHandleState: READY))
- when: 'the lcm event header is created'
- def result = objectUnderTest.createLcmEventHeader(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
- then: 'the header field are populated'
- assert result.eventCorrelationId == cmHandleId
- assert result.eventId != null
- }
-
def 'Map the LcmEvent for alternate ID, data producer identifier, and module set tag when they contain #scenario'() {
given: 'NCMP cm handle details with current and old values for alternate ID, module set tag, and data producer identifier'
def currentNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: currentAlternateId, moduleSetTag: currentModuleSetTag, dataProducerIdentifier: currentDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false))
def targetNcmpServiceCmHandle = new NcmpServiceCmHandle(cmHandleId: cmHandleId, alternateId: targetAlternateId, moduleSetTag: targetModuleSetTag, dataProducerIdentifier: targetDataProducerIdentifier, compositeState: new CompositeState(dataSyncEnabled: false))
when: 'the lcm event is created'
- def result = objectUnderTest.createLcmEvent(cmHandleId, currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
+ def result = objectUnderTest.createLcmEvent(currentNcmpServiceCmHandle, targetNcmpServiceCmHandle)
then: 'the alternate ID, module set tag, and data producer identifier are present or are an empty string in the payload'
assert result.event.alternateId == targetAlternateId
assert result.event.moduleSetTag == targetModuleSetTag
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.impl.inventory.sync.lcm
+
+import io.micrometer.core.instrument.Tag
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry
+import org.onap.cps.events.EventProducer
+import org.onap.cps.ncmp.api.inventory.models.CompositeState
+import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle
+import org.springframework.kafka.KafkaException
+import spock.lang.Specification
+
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.ADVISED
+import static org.onap.cps.ncmp.api.inventory.models.CmHandleState.READY
+
+class LcmEventProducerSpec extends Specification {
+
+ def mockEventProducer = Mock(EventProducer)
+ def lcmEventObjectCreator = new LcmEventObjectCreator()
+ def meterRegistry = new SimpleMeterRegistry()
+
+ def objectUnderTest = new LcmEventProducer(mockEventProducer, lcmEventObjectCreator, meterRegistry)
+
+ def cmHandleTransitionPair = new CmHandleTransitionPair(
+ new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: ADVISED), additionalProperties: [], publicProperties: []),
+ new YangModelCmHandle(id: 'ch-1', compositeState: new CompositeState(cmHandleState: READY), additionalProperties: [], publicProperties: [])
+ )
+
+ def 'Create and send lcm event where notifications are #scenario.'() {
+ given: 'notificationsEnabled is #notificationsEnabled'
+ objectUnderTest.notificationsEnabled = notificationsEnabled
+ when: 'service is called to send a batch of lcm events'
+ objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
+ then: 'producer is called #expectedTimesMethodCalled times with correct identifiers'
+ expectedTimesMethodCalled * mockEventProducer.sendLegacyEvent(_, 'ch-1', _, _) >> {
+ args -> {
+ def eventHeaders = args[2]
+ assert UUID.fromString(eventHeaders.get('eventId')) != null
+ assert eventHeaders.get('eventCorrelationId') == 'ch-1'
+ }
+ }
+ and: 'metrics are recorded with correct tags'
+ def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
+ if (notificationsEnabled) {
+ assert timer.count() == 1
+ assert timer.id.tags.containsAll(Tag.of('oldCmHandleState', 'ADVISED'), Tag.of('newCmHandleState', 'READY'))
+ } else {
+ assert timer == null
+ }
+ where: 'the following values are used'
+ scenario | notificationsEnabled || expectedTimesMethodCalled
+ 'enabled' | true || 1
+ 'disabled' | false || 0
+ }
+
+ def 'Exception while sending message.'(){
+ given: 'notifications are enabled'
+ objectUnderTest.notificationsEnabled = true
+ when: 'producer set to throw an exception'
+ mockEventProducer.sendLegacyEvent(*_) >> { throw new KafkaException('sending failed')}
+ and: 'attempt to send events'
+ objectUnderTest.sendLcmEventBatchAsynchronously([cmHandleTransitionPair])
+ then: 'the exception is just logged and not bubbled up'
+ noExceptionThrown()
+ and: 'metrics are not recorded'
+ assert meterRegistry.find('cps.ncmp.lcm.events.send').timer() == null
+ }
+
+}
}
def mockInventoryPersistence = Mock(InventoryPersistence)
- def mockLcmEventsCreator = Mock(LcmEventsProducerHelper)
- def mockLcmEventsProducer = Mock(LcmEventsProducer)
+ def mockLcmEventProducer = Mock(LcmEventProducer)
def mockCmHandleStateMonitor = Mock(CmHandleStateMonitor)
- def lcmEventsHelper = new LcmEventsHelper(mockLcmEventsCreator, mockLcmEventsProducer)
- def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, lcmEventsHelper, mockCmHandleStateMonitor)
+ def objectUnderTest = new LcmEventsCmHandleStateHandlerImpl(mockInventoryPersistence, mockLcmEventProducer, mockCmHandleStateMonitor)
def cmHandleId = 'cmhandle-id-1'
def currentCompositeState
assert loggingEvent.level == Level.DEBUG
assert loggingEvent.formattedMessage == "${cmHandleId} is now in ${toCmHandleState} state"
and: 'event service is called to send event'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
where: 'state change parameters are provided'
stateChange | fromCmHandleState | toCmHandleState
'ADVISED to READY' | ADVISED | READY
then: 'CM-handle is saved using inventory persistence'
1 * mockInventoryPersistence.saveCmHandleBatch(List.of(yangModelCmHandle))
and: 'event service is called to send event'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'a log entry is written'
assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state"
}
assert cmHandleStatePerCmHandleId.get(cmHandleId).lockReason.details == 'some lock details'
})
and: 'event service is called to send event'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'a log entry is written'
assert getLogMessage(0) == "${cmHandleId} is now in ADVISED state"
}
assert cmHandleStatePerCmHandleId.get(cmHandleId).dataStores.operationalDataStore.dataStoreSyncState == DataStoreSyncState.NONE_REQUESTED
})
and: 'event service is called to send event'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'a log entry is written'
assert getLogMessage(0) == "${cmHandleId} is now in READY state"
}
and: 'method to persist cm handle state is called once'
1 * mockInventoryPersistence.saveCmHandleStateBatch([(cmHandleId): yangModelCmHandle.compositeState])
and: 'the method to send Lcm event is called once'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
}
def 'Update cmHandle state to DELETING to DELETED' (){
then: 'the cm handle state is as expected'
yangModelCmHandle.getCompositeState().getCmHandleState() == DELETED
and: 'the method to send Lcm event is called once'
- 1 * mockLcmEventsProducer.sendLcmEvent(cmHandleId, _, _)
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
}
def 'No state change and no event to be sent'() {
1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
and: 'no event will be sent'
- 0 * mockLcmEventsProducer.sendLcmEvent(*_)
+ 0 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(*_)
}
def 'Batch of new cm handles provided'() {
})
and: 'no state updates are persisted'
1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
- and: 'event service is called to send events'
- 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+ and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'two log entries are written'
assert getLogMessage(0) == 'cmhandle1 is now in ADVISED state'
assert getLogMessage(1) == 'cmhandle2 is now in ADVISED state'
})
and: 'no new handles are persisted'
1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
- and: 'event service is called to send events'
- 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+ and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'two log entries are written'
assert getLogMessage(0) == 'cmhandle1 is now in READY state'
assert getLogMessage(1) == 'cmhandle2 is now in DELETING state'
1 * mockInventoryPersistence.saveCmHandleStateBatch(EMPTY_MAP)
and: 'no new handles are persisted'
1 * mockInventoryPersistence.saveCmHandleBatch(EMPTY_LIST)
- and: 'event service is called to send events'
- 2 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+ and: 'event service is called once to send 1 batch of 2 events (TODO Confirm size)'
+ 1 * mockLcmEventProducer.sendLcmEventBatchAsynchronously(_)
and: 'two log entries are written'
assert getLogMessage(0) == 'cmhandle1 is now in DELETED state'
assert getLogMessage(1) == 'cmhandle2 is now in DELETED state'
then: 'the exception is not handled'
thrown(RuntimeException)
and: 'no events are sent'
- 0 * mockLcmEventsProducer.sendLcmEvent(_, _, _)
+ 0 * mockLcmEventProducer.sendLcmEvent(*_)
and: 'no log entries are written'
assert logAppender.list.empty
}
def setupBatch(type) {
-
def yangModelCmHandle1 = new YangModelCmHandle(id: 'cmhandle1', additionalProperties: [], publicProperties: [])
def yangModelCmHandle2 = new YangModelCmHandle(id: 'cmhandle2', additionalProperties: [], publicProperties: [])
-
switch (type) {
case 'NEW':
return [yangModelCmHandle1, yangModelCmHandle2]
-
case 'DELETED':
yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: READY)
yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
return [(yangModelCmHandle1): DELETED, (yangModelCmHandle2): DELETED]
-
case 'UPDATE':
yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED)
yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
return [(yangModelCmHandle1): READY, (yangModelCmHandle2): DELETING]
-
case 'NO_CHANGE':
yangModelCmHandle1.compositeState = new CompositeState(cmHandleState: ADVISED)
yangModelCmHandle2.compositeState = new CompositeState(cmHandleState: READY)
return [(yangModelCmHandle1): ADVISED, (yangModelCmHandle2): READY]
-
default:
throw new IllegalArgumentException("batch type '${type}' not recognized")
}
+++ /dev/null
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2022-2025 OpenInfra Foundation Europe. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.impl.inventory.sync.lcm
-
-import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.ADVISED
-import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY
-
-import io.micrometer.core.instrument.Tag
-import io.micrometer.core.instrument.simple.SimpleMeterRegistry
-import org.onap.cps.events.EventsProducer
-import org.onap.cps.ncmp.events.lcm.v1.Event
-import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
-import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
-import org.onap.cps.ncmp.events.lcm.v1.Values
-import org.onap.cps.utils.JsonObjectMapper
-import org.springframework.kafka.KafkaException
-import spock.lang.Specification
-
-class LcmEventsProducerSpec extends Specification {
-
- def mockLcmEventsProducer = Mock(EventsProducer)
- def mockJsonObjectMapper = Mock(JsonObjectMapper)
- def meterRegistry = new SimpleMeterRegistry()
-
- def objectUnderTest = new LcmEventsProducer(mockLcmEventsProducer, mockJsonObjectMapper, meterRegistry)
-
- def 'Create and send lcm event where events are #scenario'() {
- given: 'a cm handle id and Lcm Event'
- def cmHandleId = 'test-cm-handle-id'
- def eventId = UUID.randomUUID().toString()
- def event = getEventWithCmHandleState(ADVISED, READY)
- def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
- and: 'we also have a lcm event header'
- def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
- and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
- objectUnderTest.notificationsEnabled = notificationsEnabled
- and: 'lcm event header is transformed to headers map'
- mockJsonObjectMapper.convertToValueType(lcmEventHeader, Map.class) >> ['eventId': eventId, 'eventCorrelationId': cmHandleId]
- when: 'service is called to send lcm event'
- objectUnderTest.sendLcmEvent('test-cm-handle-id', lcmEvent, lcmEventHeader)
- then: 'producer is called #expectedTimesMethodCalled times'
- expectedTimesMethodCalled * mockLcmEventsProducer.sendLegacyEvent(_, cmHandleId, _, lcmEvent) >> {
- args -> {
- def eventHeaders = (args[2] as Map<String,Object>)
- assert eventHeaders.containsKey('eventId')
- assert eventHeaders.containsKey('eventCorrelationId')
- assert eventHeaders.get('eventId') == eventId
- assert eventHeaders.get('eventCorrelationId') == cmHandleId
- }
- }
- and: 'metrics are recorded with correct tags'
- def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
- if (notificationsEnabled) {
- assert timer != null
- assert timer.count() == expectedTimesMethodCalled
- def tags = timer.getId().getTags()
- assert tags.containsAll(Tag.of('oldCmHandleState', ADVISED.value()), Tag.of('newCmHandleState', READY.value()))
- } else {
- assert timer == null
- }
- where: 'the following values are used'
- scenario | notificationsEnabled || expectedTimesMethodCalled
- 'enabled' | true || 1
- 'disabled' | false || 0
- }
-
- def 'Unable to send message'(){
- given: 'a cm handle id and Lcm Event and notification enabled'
- def cmHandleId = 'test-cm-handle-id'
- def eventId = UUID.randomUUID().toString()
- and: 'event #event'
- def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
- def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
- objectUnderTest.notificationsEnabled = true
- when: 'producer set to throw an exception'
- mockLcmEventsProducer.sendLegacyEvent(_, _, _, _) >> { throw new KafkaException('sending failed')}
- and: 'an event is publised'
- objectUnderTest.sendLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
- then: 'the exception is just logged and not bubbled up'
- noExceptionThrown()
- and: 'metrics are recorded with error tags'
- def timer = meterRegistry.find('cps.ncmp.lcm.events.send').timer()
- assert timer != null
- assert timer.count() == 1
- def expectedTags = [Tag.of('oldCmHandleState', 'N/A'), Tag.of('newCmHandleState', 'N/A')]
- def tags = timer.getId().getTags()
- assert tags.containsAll(expectedTags)
- where: 'the following values are used'
- scenario | event
- 'without values' | new Event()
- 'without cm handle state' | getEvent()
- }
-
- def getEvent() {
- def event = new Event()
- def values = new Values()
- event.setOldValues(values)
- event.setNewValues(values)
- event
- }
-
- def getEventWithCmHandleState(oldCmHandleState, newCmHandleState) {
- def event = new Event()
- def advisedCmHandleStateValues = new Values()
- advisedCmHandleStateValues.setCmHandleState(oldCmHandleState)
- event.setOldValues(advisedCmHandleStateValues)
- def readyCmHandleStateValues = new Values()
- readyCmHandleStateValues.setCmHandleState(newCmHandleState)
- event.setNewValues(readyCmHandleStateValues)
- return event
- }
-}
package org.onap.cps.ncmp.utils.events
import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import org.onap.cps.events.EventsProducer
+import org.onap.cps.events.EventProducer
import org.onap.cps.ncmp.config.CpsApplicationContext
import org.onap.cps.ncmp.events.avc.ncmp_to_client.Avc
import org.onap.cps.ncmp.events.avc.ncmp_to_client.AvcEvent
@ContextConfiguration(classes = [CpsApplicationContext, ObjectMapper, JsonObjectMapper])
class InventoryEventProducerSpec extends MessagingBaseSpec {
- def mockEventsProducer = Mock(EventsProducer)
- def objectUnderTest = new InventoryEventProducer(mockEventsProducer)
+ def mockEventProducer = Mock(EventProducer)
+ def objectUnderTest = new InventoryEventProducer(mockEventProducer)
def 'Send an attribute value change event'() {
given: 'the event key'
when: 'an attribute value change event is sent'
objectUnderTest.sendAvcEvent(someEventKey, someAttributeName, someOldAttributeValue, someNewAttributeValue)
then: 'the cloud event producer is invoked with the correct data'
- 1 * mockEventsProducer.sendCloudEvent(_, someEventKey,
+ 1 * mockEventProducer.sendCloudEvent(_, someEventKey,
cloudEvent -> {
def actualAvcs = CloudEventMapper.toTargetEvent(cloudEvent, AvcEvent.class).data.attributeValueChange
def expectedAvc = new Avc(attributeName: someAttributeName,
@RequiredArgsConstructor
public class CpsDataUpdateEventsProducer {
- private final EventsProducer eventsProducer;
+ private final EventProducer eventProducer;
private final CpsNotificationService cpsNotificationService;
final CloudEvent cpsDataUpdatedEventAsCloudEvent =
CpsEvent.builder().type(CpsDataUpdatedEvent.class.getTypeName()).data(cpsDataUpdatedEvent)
.extensions(extensions).build().asCloudEvent();
- eventsProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
+ eventProducer.sendCloudEvent(topicName, updateEventId, cpsDataUpdatedEventAsCloudEvent);
} else {
log.debug("State of Overall Notifications : {} and Cps Change Event Notifications : {}",
notificationsEnabled, cpsChangeEventNotificationsEnabled);
import org.springframework.util.SerializationUtils;
/**
- * EventsProducer to send events.
+ * EventProducer to send events.
*/
@Slf4j
@Service
@RequiredArgsConstructor
-public class EventsProducer {
+public class EventProducer {
/**
* KafkaTemplate for legacy (non-cloud) events.
- * Note: Cloud events should be used. This will be addressed as part of <a
- * href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
+ * Note: Cloud events should be used. This will be addressed as part of
+ * <a href="https://lf-onap.atlassian.net/browse/CPS-1717">...</a>.
*/
@Qualifier("legacyEventKafkaTemplate")
private final KafkaTemplate<String, LegacyEvent> legacyEventKafkaTemplate;
import static org.onap.cps.events.model.EventPayload.Action.REPLACE
@ContextConfiguration(classes = [ObjectMapper, JsonObjectMapper])
-class CpsDataUpdateEventsProducerSpec extends Specification {
+class CpsDataUpdateEventProducerSpec extends Specification {
static def CREATE_ACTION = CREATE.value()
static def REPLACE_ACTION = REPLACE.value()
static def REMOVE_ACTION = REMOVE.value()
- def mockEventsProducer = Mock(EventsProducer)
+ def mockEventProducer = Mock(EventProducer)
def objectMapper = new ObjectMapper();
def mockCpsNotificationService = Mock(CpsNotificationService)
- def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventsProducer, mockCpsNotificationService)
+ def objectUnderTest = new CpsDataUpdateEventsProducer(mockEventProducer, mockCpsNotificationService)
def setup() {
mockCpsNotificationService.isNotificationEnabled('dataspace01', 'anchor01') >> true
when: 'service is called to send data update event'
objectUnderTest.sendCpsDataUpdateEvent(anchor, xpath, actionInRequest, OffsetDateTime.now())
then: 'the event contains the required attributes'
- 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
+ 1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _) >> {
args ->
{
def cpsDataUpdatedEvent = (args[2] as CloudEvent)
when: 'service is called to send data event'
objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
then: 'the event is sent'
- 1 * mockEventsProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
+ 1 * mockEventProducer.sendCloudEvent('cps-core-event', 'dataspace01:anchor01', _)
}
def 'Enabling and disabling sending cps events.'() {
when: 'service is called to send data event'
objectUnderTest.sendCpsDataUpdateEvent(anchor, '/', CREATE_ACTION, null)
then: 'the event is only sent when all related flags are true'
- expectedCallsToProducer * mockEventsProducer.sendCloudEvent(*_)
+ expectedCallsToProducer * mockEventProducer.sendCloudEvent(*_)
where: 'the following flags are used'
notificationsEnabled | cpsChangeEventNotificationsEnabled | cpsNotificationServiceisNotificationEnabled || expectedCallsToProducer
false | true | true || 0
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.header.Headers
-import org.apache.kafka.common.header.internals.RecordHeader
-import org.apache.kafka.common.header.internals.RecordHeaders
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
import java.util.concurrent.CompletableFuture
-class EventsProducerSpec extends Specification {
+class EventProducerSpec extends Specification {
def mockLegacyKafkaTemplate = Mock(KafkaTemplate)
def mockCloudEventKafkaTemplate = Mock(KafkaTemplate)
def logger = Spy(ListAppender<ILoggingEvent>)
void setup() {
- def setupLogger = ((Logger) LoggerFactory.getLogger(EventsProducer.class))
+ def setupLogger = ((Logger) LoggerFactory.getLogger(EventProducer.class))
setupLogger.setLevel(Level.DEBUG)
setupLogger.addAppender(logger)
logger.start()
}
void cleanup() {
- ((Logger) LoggerFactory.getLogger(EventsProducer.class)).detachAndStopAllAppenders()
+ ((Logger) LoggerFactory.getLogger(EventProducer.class)).detachAndStopAllAppenders()
}
- def objectUnderTest = new EventsProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
+ def objectUnderTest = new EventProducer(mockLegacyKafkaTemplate, mockCloudEventKafkaTemplate, mockCloudEventKafkaTemplateForEos)
def 'Send Cloud Event'() {
given: 'a successfully sent event'
getProducerRecord() >> Mock(ProducerRecord)
}
def runtimeException = new RuntimeException('some runtime exception')
- def logOutcomeMethod = EventsProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
+ def logOutcomeMethod = EventProducer.getDeclaredMethod('logOutcome', String, SendResult, Throwable, boolean)
logOutcomeMethod.accessible = true
when: 'logging the outcome with throwKafkaException set to true'
logOutcomeMethod.invoke(null, 'some-topic', sendResult, runtimeException, true)