2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2024 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
23 import io.cloudevents.CloudEvent;
24 import lombok.RequiredArgsConstructor;
25 import org.apache.kafka.clients.consumer.ConsumerRecord;
26 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus;
27 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper;
28 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
29 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
30 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
31 import org.springframework.beans.factory.annotation.Value;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.kafka.core.KafkaTemplate;
34 import org.springframework.stereotype.Service;
37 @RequiredArgsConstructor
38 public class CmNotificationSubscriptionDmiInEventConsumer {
41 @Value("${app.dmi.avc.subscription-response-topic}")
42 private String cmNotificationSubscriptionResponseTopic;
43 @Value("${dmi.service.name}")
44 private String dmiName;
45 private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
48 * Consume the cmNotificationSubscriptionDmiInCloudEvent event.
50 * @param cmNotificationSubscriptionDmiInCloudEvent the event to be consumed
52 @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
53 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
54 public void consumeCmNotificationSubscriptionDmiInEvent(
55 final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiInCloudEvent) {
56 final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
57 CloudEventMapper.toTargetEvent(cmNotificationSubscriptionDmiInCloudEvent.value(),
58 CmNotificationSubscriptionDmiInEvent.class);
59 if (cmNotificationSubscriptionDmiInEvent != null) {
60 final String subscriptionId = cmNotificationSubscriptionDmiInCloudEvent.value().getId();
61 final String subscriptionType = cmNotificationSubscriptionDmiInCloudEvent.value().getType();
62 final String correlationId = String.valueOf(cmNotificationSubscriptionDmiInCloudEvent.value()
63 .getExtension("correlationid"));
65 if ("subscriptionCreated".equals(subscriptionType)) {
66 createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionCreateResponse",
67 correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
68 } else if ("subscriptionDeleted".equals(subscriptionType)) {
69 createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionDeleteResponse",
70 correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
76 * Create Dmi out event object and send to response topic.
78 * @param eventKey the events key
79 * @param subscriptionType the subscriptions type
80 * @param correlationId the events correlation Id
81 * @param cmNotificationSubscriptionStatus subscriptions status accepted/rejected
83 public void createAndSendCmNotificationSubscriptionDmiOutEvent(
84 final String eventKey, final String subscriptionType, final String correlationId,
85 final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
87 final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
88 new CmNotificationSubscriptionDmiOutEvent();
89 final Data cmNotificationSubscriptionDmiOutEventData = new Data();
91 if (cmNotificationSubscriptionStatus.equals(CmNotificationSubscriptionStatus.ACCEPTED)) {
92 cmNotificationSubscriptionDmiOutEventData.setStatusCode("1");
93 cmNotificationSubscriptionDmiOutEventData.setStatusMessage("ACCEPTED");
95 cmNotificationSubscriptionDmiOutEventData.setStatusCode("2");
96 cmNotificationSubscriptionDmiOutEventData.setStatusMessage("REJECTED");
98 cmNotificationSubscriptionDmiOutEvent.setData(cmNotificationSubscriptionDmiOutEventData);
100 cloudEventKafkaTemplate.send(cmNotificationSubscriptionResponseTopic, eventKey,
101 CmNotificationSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmNotificationSubscriptionDmiOutEvent,
102 subscriptionType, dmiName, correlationId));