2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.api.impl.events.cmsubscription.consumer;
23 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
25 import io.cloudevents.CloudEvent;
26 import lombok.RequiredArgsConstructor;
27 import lombok.extern.slf4j.Slf4j;
28 import org.apache.kafka.clients.consumer.ConsumerRecord;
29 import org.onap.cps.ncmp.api.impl.events.cmsubscription.DmiCmNotificationSubscriptionCacheHandler;
30 import org.onap.cps.ncmp.api.impl.events.cmsubscription.model.CmNotificationSubscriptionStatus;
31 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.stereotype.Component;
37 @RequiredArgsConstructor
38 public class CmNotificationSubscriptionDmiOutEventConsumer {
40 private final DmiCmNotificationSubscriptionCacheHandler dmiCmNotificationSubscriptionCacheHandler;
43 * Consume the Cm Notification Subscription event from the dmi-plugin.
45 * @param cmNotificationSubscriptionDmiOutEventConsumerRecord the event to be consumed
47 @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
48 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
49 public void consumeCmNotificationSubscriptionDmiOutEvent(
50 final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiOutEventConsumerRecord) {
51 final CloudEvent cloudEvent = cmNotificationSubscriptionDmiOutEventConsumerRecord.value();
52 final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
53 toTargetEvent(cloudEvent, CmNotificationSubscriptionDmiOutEvent.class);
54 final String correlationId = String.valueOf(cloudEvent.getExtension("correlationid"));
55 if ("subscriptionCreateResponse".equals(cloudEvent.getType()) && cmNotificationSubscriptionDmiOutEvent != null
56 && correlationId != null) {
57 handleCmSubscriptionCreate(correlationId, cmNotificationSubscriptionDmiOutEvent);
61 private void handleCmSubscriptionCreate(final String correlationId,
62 final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent) {
63 final String subscriptionId = correlationId.split("#")[0];
64 final String dmiPluginName = correlationId.split("#")[1];
66 if ("ACCEPTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
67 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.ACCEPTED);
68 dmiCmNotificationSubscriptionCacheHandler.persistIntoDatabasePerDmi(subscriptionId, dmiPluginName);
71 if ("REJECTED".equals(cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage())) {
72 handleCacheStatusPerDmi(subscriptionId, dmiPluginName, CmNotificationSubscriptionStatus.REJECTED);
75 log.info("Cm Subscription with id : {} handled by the dmi-plugin : {} has the status : {}", subscriptionId,
76 dmiPluginName, cmNotificationSubscriptionDmiOutEvent.getData().getStatusMessage());
79 private void handleCacheStatusPerDmi(final String subscriptionId, final String dmiPluginName,
80 final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
81 dmiCmNotificationSubscriptionCacheHandler.updateDmiCmNotificationSubscriptionStatusPerDmi(subscriptionId,
82 dmiPluginName, cmNotificationSubscriptionStatus);