2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
23 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
24 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
25 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
27 import io.cloudevents.CloudEvent;
29 import lombok.RequiredArgsConstructor;
30 import lombok.extern.slf4j.Slf4j;
31 import org.apache.kafka.clients.consumer.ConsumerRecord;
32 import org.onap.cps.ncmp.api.NcmpResponseStatus;
33 import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionEventsHandler;
34 import org.onap.cps.ncmp.api.impl.events.cmsubscription.CmNotificationSubscriptionMappersHandler;
35 import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
36 import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
37 import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.DmiCmNotificationSubscriptionDetails;
38 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
39 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
40 import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.ncmp_to_client.CmNotificationSubscriptionNcmpOutEvent;
41 import org.springframework.kafka.annotation.KafkaListener;
42 import org.springframework.stereotype.Component;
46 @RequiredArgsConstructor
47 public class CmNotificationSubscriptionDmiOutEventConsumer {
49 private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
50 private final CmNotificationSubscriptionEventsHandler cmNotificationSubscriptionEventsHandler;
51 private final CmNotificationSubscriptionMappersHandler cmNotificationSubscriptionMappersHandler;
53 private static final String CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
56 * Consume the Cm Notification Subscription event from the dmi-plugin.
58 * @param cmNotificationSubscriptionDmiOutEventConsumerRecord the event to be consumed
60 @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
61 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
62 public void consumeCmNotificationSubscriptionDmiOutEvent(
63 final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiOutEventConsumerRecord) {
64 final CloudEvent cloudEvent = cmNotificationSubscriptionDmiOutEventConsumerRecord.value();
65 final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
66 toTargetEvent(cloudEvent, CmNotificationSubscriptionDmiOutEvent.class);
67 final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
68 if (cmNotificationSubscriptionDmiOutEvent != null && correlationId != null) {
69 final String eventType = cloudEvent.getType();
70 handleCmSubscriptionDmiOutEvent(correlationId, eventType, cmNotificationSubscriptionDmiOutEvent);
74 private void handleCmSubscriptionDmiOutEvent(final String correlationId,
75 final String eventType,
76 final CmNotificationSubscriptionDmiOutEvent
77 cmNotificationSubscriptionDmiOutEvent) {
78 final String subscriptionId = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
79 final String dmiPluginName = correlationId.split(CM_DATA_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
81 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
82 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
83 if (eventType.equals("subscriptionCreateResponse")) {
84 dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
86 if (eventType.equals("subscriptionDeleteResponse")) {
87 dmiCmNotificationSubscriptionCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
89 handleEventsStatusPerDmi(subscriptionId, eventType);
92 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, cmNotificationSubscriptionDmiOutEvent.getData())) {
93 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
94 handleEventsStatusPerDmi(subscriptionId, eventType);
97 log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
98 dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
101 private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
102 final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
103 dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
104 dmiPluginName, cmNotificationSubscriptionStatus);
107 private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
108 final Map<String, DmiCmNotificationSubscriptionDetails> dmiCmNotificationSubscriptionDetailsPerDmi =
109 dmiCmNotificationSubscriptionCacheHandler.get(subscriptionId);
110 final CmNotificationSubscriptionNcmpOutEvent cmNotificationSubscriptionNcmpOutEvent =
111 cmNotificationSubscriptionMappersHandler.toCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
112 dmiCmNotificationSubscriptionDetailsPerDmi);
113 cmNotificationSubscriptionEventsHandler.publishCmNotificationSubscriptionNcmpOutEvent(subscriptionId,
114 eventType, cmNotificationSubscriptionNcmpOutEvent, false);
117 private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
118 final Data cmNotificationSubscriptionDmiOutData) {
119 return ncmpResponseStatus.getCode().equals(cmNotificationSubscriptionDmiOutData.getStatusCode())
120 && ncmpResponseStatus.getMessage()
121 .equals(cmNotificationSubscriptionDmiOutData.getStatusMessage());