2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.impl.cmnotificationsubscription.dmi;
23 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
24 import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_REJECTED;
25 import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
27 import io.cloudevents.CloudEvent;
29 import lombok.RequiredArgsConstructor;
30 import lombok.extern.slf4j.Slf4j;
31 import org.apache.kafka.clients.consumer.ConsumerRecord;
32 import org.onap.cps.ncmp.api.NcmpResponseStatus;
33 import org.onap.cps.ncmp.impl.cmnotificationsubscription.EventsFacade;
34 import org.onap.cps.ncmp.impl.cmnotificationsubscription.MappersFacade;
35 import org.onap.cps.ncmp.impl.cmnotificationsubscription.cache.DmiCacheHandler;
36 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.CmSubscriptionStatus;
37 import org.onap.cps.ncmp.impl.cmnotificationsubscription.models.DmiCmSubscriptionDetails;
38 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.Data;
39 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.dmi_to_ncmp.DmiOutEvent;
40 import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.ncmp_to_client.NcmpOutEvent;
41 import org.springframework.kafka.annotation.KafkaListener;
42 import org.springframework.stereotype.Component;
46 @RequiredArgsConstructor
47 public class DmiOutEventConsumer {
49 private final DmiCacheHandler dmiCacheHandler;
50 private final EventsFacade eventsFacade;
51 private final MappersFacade mappersFacade;
53 private static final String CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR = "#";
56 * Consume the Cm Notification Subscription event from the dmi-plugin.
58 * @param dmiOutEventAsConsumerRecord the event to be consumed
60 @KafkaListener(topics = "${app.ncmp.avc.cm-subscription-dmi-out}",
61 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
62 public void consumeDmiOutEvent(final ConsumerRecord<String, CloudEvent> dmiOutEventAsConsumerRecord) {
63 final CloudEvent cloudEvent = dmiOutEventAsConsumerRecord.value();
64 final DmiOutEvent dmiOutEvent = toTargetEvent(cloudEvent, DmiOutEvent.class);
65 final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
66 if (dmiOutEvent != null && correlationId != null) {
67 final String eventType = cloudEvent.getType();
68 handleDmiOutEvent(correlationId, eventType, dmiOutEvent);
72 private void handleDmiOutEvent(final String correlationId, final String eventType,
73 final DmiOutEvent dmiOutEvent) {
74 final String subscriptionId = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[0];
75 final String dmiPluginName = correlationId.split(CM_SUBSCRIPTION_CORRELATION_ID_SEPARATOR)[1];
77 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_ACCEPTED, dmiOutEvent.getData())) {
78 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.ACCEPTED);
79 if (eventType.equals("subscriptionCreateResponse")) {
80 dmiCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
82 if (eventType.equals("subscriptionDeleteResponse")) {
83 dmiCacheHandler.removeFromDatabasePerDmi(subscriptionId, dmiPluginName);
85 handleEventsStatusPerDmi(subscriptionId, eventType);
88 if (checkStatusCodeAndMessage(CM_DATA_SUBSCRIPTION_REJECTED, dmiOutEvent.getData())) {
89 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmSubscriptionStatus.REJECTED);
90 handleEventsStatusPerDmi(subscriptionId, eventType);
93 log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
94 dmiPluginName, dmiOutEvent.getData().getStatusMessage());
97 private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
98 final CmSubscriptionStatus cmSubscriptionStatus) {
99 dmiCacheHandler.updateDmiSubscriptionStatusPerDmi(subscriptionId, dmiPluginName,
100 cmSubscriptionStatus);
103 private void handleEventsStatusPerDmi(final String subscriptionId, final String eventType) {
104 final Map<String, DmiCmSubscriptionDetails> dmiSubscriptionsPerDmi =
105 dmiCacheHandler.get(subscriptionId);
106 final NcmpOutEvent ncmpOutEvent = mappersFacade.toNcmpOutEvent(subscriptionId,
107 dmiSubscriptionsPerDmi);
108 eventsFacade.publishNcmpOutEvent(subscriptionId, eventType,
109 ncmpOutEvent, false);
112 private boolean checkStatusCodeAndMessage(final NcmpResponseStatus ncmpResponseStatus,
113 final Data dmiOutData) {
114 return ncmpResponseStatus.getCode().equals(dmiOutData.getStatusCode())
115 && ncmpResponseStatus.getMessage()
116 .equals(dmiOutData.getStatusMessage());