57652809df7f67f0687164188ab482b702da913a
[cps/ncmp-dmi-plugin.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023 Nordix Foundation
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
22
23 import io.cloudevents.CloudEvent;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Set;
28 import lombok.RequiredArgsConstructor;
29 import org.apache.kafka.clients.consumer.ConsumerRecord;
30 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper;
31 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
32 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data;
33 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
34 import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle;
35 import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent;
36 import org.springframework.beans.factory.annotation.Value;
37 import org.springframework.kafka.annotation.KafkaListener;
38 import org.springframework.kafka.core.KafkaTemplate;
39 import org.springframework.stereotype.Service;
40
41 @Service
42 @RequiredArgsConstructor
43 public class CmSubscriptionDmiInEventConsumer {
44
45     @Value("${app.dmi.avc.subscription-response-topic}")
46     private String cmAvcSubscriptionResponseTopic;
47     @Value("${dmi.service.name}")
48     private String dmiName;
49     private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
50
51     /**
52      * Consume the specified event.
53      *
54      * @param cmSubscriptionDmiInCloudEvent the event to be consumed
55      */
56     @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
57             containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
58     public void consumeCmSubscriptionDmiInEvent(
59             final ConsumerRecord<String, CloudEvent> cmSubscriptionDmiInCloudEvent) {
60         final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent =
61                 CloudEventMapper.toTargetEvent(cmSubscriptionDmiInCloudEvent.value(), CmSubscriptionDmiInEvent.class);
62         if (cmSubscriptionDmiInEvent != null) {
63             final String eventKey = cmSubscriptionDmiInCloudEvent.value().getId();
64             final String subscriptionType = cmSubscriptionDmiInCloudEvent.value().getType();
65             if ("subscriptionCreated".equals(subscriptionType)) {
66                 sendCmSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus",
67                         formCmSubscriptionDmiOutEvent(cmSubscriptionDmiInEvent));
68             } else if ("subscriptionDeleted".equals(subscriptionType)) {
69                 sendCmSubscriptionDmiOutEvent(eventKey, "subscriptionDeletedStatus",
70                         formCmSubscriptionDmiOutEvent(cmSubscriptionDmiInEvent));
71             }
72         }
73     }
74
75     /**
76      * Sends message to the configured topic.
77      *
78      * @param eventKey                  is the kafka message key
79      * @param subscriptionType          is the type of subscription action
80      * @param cmSubscriptionDmiOutEvent is the payload of the kafka message
81      */
82     public void sendCmSubscriptionDmiOutEvent(final String eventKey, final String subscriptionType,
83             final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) {
84         cloudEventKafkaTemplate.send(cmAvcSubscriptionResponseTopic, eventKey,
85                 CmSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmSubscriptionDmiOutEvent, subscriptionType,
86                         dmiName));
87     }
88
89     private CmSubscriptionDmiOutEvent formCmSubscriptionDmiOutEvent(
90             final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent) {
91         final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent = new CmSubscriptionDmiOutEvent();
92         final Data cmSubscriptionDmiOutEventData = new Data();
93         cmSubscriptionDmiOutEventData.setClientId(cmSubscriptionDmiInEvent.getData().getSubscription().getClientID());
94         cmSubscriptionDmiOutEventData.setSubscriptionName(
95                 cmSubscriptionDmiInEvent.getData().getSubscription().getName());
96         cmSubscriptionDmiOutEventData.setDmiName(dmiName);
97
98         final List<CmHandle> cmHandles = cmSubscriptionDmiInEvent.getData().getPredicates().getTargets();
99         cmSubscriptionDmiOutEventData.setSubscriptionStatus(populateSubscriptionStatus(extractCmHandleIds(cmHandles)));
100         cmSubscriptionDmiOutEvent.setData(cmSubscriptionDmiOutEventData);
101         return cmSubscriptionDmiOutEvent;
102     }
103
104     private Set<String> extractCmHandleIds(final List<CmHandle> cmHandles) {
105         final Set<String> cmHandleIds = new HashSet<>();
106
107         for (final CmHandle cmHandle : cmHandles) {
108             cmHandleIds.add(cmHandle.getId());
109         }
110         return cmHandleIds;
111     }
112
113     private List<SubscriptionStatus> populateSubscriptionStatus(final Set<String> cmHandleIds) {
114         final List<SubscriptionStatus> subscriptionStatuses = new ArrayList<>();
115         for (final String cmHandleId : cmHandleIds) {
116             final SubscriptionStatus status = new SubscriptionStatus();
117             status.setId(cmHandleId);
118             status.setStatus(SubscriptionStatus.Status.ACCEPTED);
119             subscriptionStatuses.add(status);
120         }
121         return subscriptionStatuses;
122     }
123
124 }