ecfef6f3015066ff496f83aaa15d86ac109c8218
[cps/ncmp-dmi-plugin.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2024 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
22
23 import io.cloudevents.CloudEvent;
24 import lombok.RequiredArgsConstructor;
25 import org.apache.kafka.clients.consumer.ConsumerRecord;
26 import org.onap.cps.ncmp.dmi.notifications.cmsubscription.model.CmNotificationSubscriptionStatus;
27 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper;
28 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.CmNotificationSubscriptionDmiOutEvent;
29 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.dmi_to_ncmp.Data;
30 import org.onap.cps.ncmp.events.cmnotificationsubscription_merge1_0_0.ncmp_to_dmi.CmNotificationSubscriptionDmiInEvent;
31 import org.springframework.beans.factory.annotation.Value;
32 import org.springframework.kafka.annotation.KafkaListener;
33 import org.springframework.kafka.core.KafkaTemplate;
34 import org.springframework.stereotype.Service;
35
36 @Service
37 @RequiredArgsConstructor
38 public class CmNotificationSubscriptionDmiInEventConsumer {
39
40
41     @Value("${app.dmi.avc.subscription-response-topic}")
42     private String cmNotificationSubscriptionResponseTopic;
43     @Value("${dmi.service.name}")
44     private String dmiName;
45     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
46
47     /**
48      * Consume the cmNotificationSubscriptionDmiInCloudEvent event.
49      *
50      * @param cmNotificationSubscriptionDmiInCloudEvent the event to be consumed
51      */
52     @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
53         containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
54     public void consumeCmNotificationSubscriptionDmiInEvent(
55         final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiInCloudEvent) {
56         final CmNotificationSubscriptionDmiInEvent cmNotificationSubscriptionDmiInEvent =
57             CloudEventMapper.toTargetEvent(cmNotificationSubscriptionDmiInCloudEvent.value(),
58                 CmNotificationSubscriptionDmiInEvent.class);
59         if (cmNotificationSubscriptionDmiInEvent != null) {
60             final String subscriptionId = cmNotificationSubscriptionDmiInCloudEvent.value().getId();
61             final String subscriptionType = cmNotificationSubscriptionDmiInCloudEvent.value().getType();
62             final String correlationId = String.valueOf(cmNotificationSubscriptionDmiInCloudEvent.value()
63                 .getExtension("correlationid"));
64
65             if ("subscriptionCreated".equals(subscriptionType)) {
66                 createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionCreateResponse",
67                     correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
68             } else if ("subscriptionDeleted".equals(subscriptionType)) {
69                 createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionDeleteResponse",
70                     correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
71             }
72         }
73     }
74
75     /**
76      * Create Dmi out event object and send to response topic.
77      *
78      * @param eventKey the events key
79      * @param subscriptionType the subscriptions type
80      * @param correlationId the events correlation Id
81      * @param cmNotificationSubscriptionStatus subscriptions status accepted/rejected
82      */
83     public void createAndSendCmNotificationSubscriptionDmiOutEvent(
84         final String eventKey, final String subscriptionType, final String correlationId,
85         final CmNotificationSubscriptionStatus cmNotificationSubscriptionStatus) {
86
87         final CmNotificationSubscriptionDmiOutEvent cmNotificationSubscriptionDmiOutEvent =
88             new CmNotificationSubscriptionDmiOutEvent();
89         final Data cmNotificationSubscriptionDmiOutEventData = new Data();
90
91         if (cmNotificationSubscriptionStatus.equals(CmNotificationSubscriptionStatus.ACCEPTED)) {
92             cmNotificationSubscriptionDmiOutEventData.setStatusCode("1");
93             cmNotificationSubscriptionDmiOutEventData.setStatusMessage("ACCEPTED");
94         } else if (cmNotificationSubscriptionStatus.equals(CmNotificationSubscriptionStatus.REJECTED)) {
95             cmNotificationSubscriptionDmiOutEventData.setStatusCode("2");
96             cmNotificationSubscriptionDmiOutEventData.setStatusMessage("REJECTED");
97         }
98         cmNotificationSubscriptionDmiOutEvent.setData(cmNotificationSubscriptionDmiOutEventData);
99
100         cloudEventKafkaTemplate.send(cmNotificationSubscriptionResponseTopic, eventKey,
101             CmNotificationSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmNotificationSubscriptionDmiOutEvent,
102                 subscriptionType, dmiName, correlationId));
103
104     }
105
106
107
108 }