2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
23 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
24 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
25 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
27 import io.cloudevents.CloudEvent;
29 import lombok.RequiredArgsConstructor;
30 import lombok.extern.slf4j.Slf4j;
31 import org.apache.kafka.clients.consumer.ConsumerRecord;
32 import org.onap.cps.ncmp.api.NcmpResponseStatus;
33 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
34 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
35 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
36 import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventMapper;
37 import org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp.NcmpOutEventProducer;
38 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data;
39 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent;
40 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
41 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
42 import org.springframework.kafka.annotation.KafkaListener;
43 import org.springframework.stereotype.Component;
47 @RequiredArgsConstructor
48 @ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
49 public class DmiOutEventConsumer {
51 private final DmiCacheHandler dmiCacheHandler;
52 private final NcmpOutEventProducer ncmpOutEventProducer;
53 private final NcmpOutEventMapper ncmpOutEventMapper;
55 private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
58 * Consume the Cm Notification Subscription event from the dmi-plugin.
60 * @param dmiOutEventAsConsumerRecord the event to be consumed
62 @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
63 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
64 public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
65 final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
66 final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
67 final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
68 if (dmiOutEvent != null && correlationId != null) {
69 final String eventType = cloudEvent.getType();
70 handleDmiOutEvent(correlationId, eventType, dmiOutEvent);
74 private void handleDmiOutEvent(final String correlationId, final String eventType,
75 final DmiOutEvent dmiOutEvent) {
76 final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
77 final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
79 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) {
80 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED);
81 if (eventType.equals("subscriptionCreateResponse")) {
82 dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
84 if (eventType.equals("subscriptionDeleteResponse")) {
85 dmiCacheHandler.removeFromDatabase(subscriptionId, dmiPluginName);
87 handleEventsStatusPerDmi(subscriptionId, eventType);
90 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) {
91 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED);
92 handleEventsStatusPerDmi(subscriptionId, eventType);
95 log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
96 dmiPluginName, dmiOutEvent.getData().getStatusMessage());
99 private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
100 final CmSubscriptionStatus cmSubscriptionStatus) {
101 dmiCacheHandler.updateDmiSubscriptionStatus(subscriptionId, dmiPluginName,
102 cmSubscriptionStatus);
105 private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
106 final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi = dmiCacheHandler.get(subscriptionId);
107 final NcmpOutEvent ncmpOutEvent = ncmpOutEventMapper.toNcmpOutEvent(subscriptionId, dmiSubscriptionsPerDmi);
108 ncmpOutEventProducer.publishNcmpOutEvent(subscriptionId, eventType, ncmpOutEvent, false);
111 private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
112 final Data dmiOutData) {
113 return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
114 && ncmpResponseStatus.getMessage()
115 .equals(dmiOutData.getStatusMessage());