2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2023 Nordix Foundation
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.cps.ncmp.dmi.notifications.cmsubscription;
23 import io.cloudevents.CloudEvent;
24 import java.util.ArrayList;
25 import java.util.HashSet;
26 import java.util.List;
28 import lombok.RequiredArgsConstructor;
29 import org.apache.kafka.clients.consumer.ConsumerRecord;
30 import org.onap.cps.ncmp.dmi.notifications.mapper.CloudEventMapper;
31 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
32 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.Data;
33 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.SubscriptionStatus;
34 import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmHandle;
35 import org.onap.cps.ncmp.events.cmsubscription1_0_0.ncmp_to_dmi.CmSubscriptionDmiInEvent;
36 import org.springframework.beans.factory.annotation.Value;
37 import org.springframework.kafka.annotation.KafkaListener;
38 import org.springframework.kafka.core.KafkaTemplate;
39 import org.springframework.stereotype.Service;
42 @RequiredArgsConstructor
43 public class CmSubscriptionDmiInEventConsumer {
45 @Value("${app.dmi.avc.subscription-response-topic}")
46 private String cmAvcSubscriptionResponseTopic;
47 @Value("${dmi.service.name}")
48 private String dmiName;
49 private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
52 * Consume the specified event.
54 * @param cmSubscriptionDmiInCloudEvent the event to be consumed
56 @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
57 containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
58 public void consumeCmSubscriptionDmiInEvent(
59 final ConsumerRecord<String, CloudEvent> cmSubscriptionDmiInCloudEvent) {
60 final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent =
61 CloudEventMapper.toTargetEvent(cmSubscriptionDmiInCloudEvent.value(), CmSubscriptionDmiInEvent.class);
62 if (cmSubscriptionDmiInEvent != null) {
63 final String eventKey = cmSubscriptionDmiInCloudEvent.value().getId();
64 final String subscriptionType = cmSubscriptionDmiInCloudEvent.value().getType();
65 if ("subscriptionCreated".equals(subscriptionType)) {
66 sendCmSubscriptionDmiOutEvent(eventKey, "subscriptionCreatedStatus",
67 formCmSubscriptionDmiOutEvent(cmSubscriptionDmiInEvent));
68 } else if ("subscriptionDeleted".equals(subscriptionType)) {
69 sendCmSubscriptionDmiOutEvent(eventKey, "subscriptionDeletedStatus",
70 formCmSubscriptionDmiOutEvent(cmSubscriptionDmiInEvent));
76 * Sends message to the configured topic.
78 * @param eventKey is the kafka message key
79 * @param subscriptionType is the type of subscription action
80 * @param cmSubscriptionDmiOutEvent is the payload of the kafka message
82 public void sendCmSubscriptionDmiOutEvent(final String eventKey, final String subscriptionType,
83 final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) {
84 cloudEventKafkaTemplate.send(cmAvcSubscriptionResponseTopic, eventKey,
85 CmSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmSubscriptionDmiOutEvent, subscriptionType,
89 private CmSubscriptionDmiOutEvent formCmSubscriptionDmiOutEvent(
90 final CmSubscriptionDmiInEvent cmSubscriptionDmiInEvent) {
91 final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent = new CmSubscriptionDmiOutEvent();
92 final Data cmSubscriptionDmiOutEventData = new Data();
93 cmSubscriptionDmiOutEventData.setClientId(cmSubscriptionDmiInEvent.getData().getSubscription().getClientID());
94 cmSubscriptionDmiOutEventData.setSubscriptionName(
95 cmSubscriptionDmiInEvent.getData().getSubscription().getName());
96 cmSubscriptionDmiOutEventData.setDmiName(dmiName);
98 final List<CmHandle> cmHandles = cmSubscriptionDmiInEvent.getData().getPredicates().getTargets();
99 cmSubscriptionDmiOutEventData.setSubscriptionStatus(populateSubscriptionStatus(extractCmHandleIds(cmHandles)));
100 cmSubscriptionDmiOutEvent.setData(cmSubscriptionDmiOutEventData);
101 return cmSubscriptionDmiOutEvent;
104 private Set<String> extractCmHandleIds(final List<CmHandle> cmHandles) {
105 final Set<String> cmHandleIds = new HashSet<>();
107 for (final CmHandle cmHandle : cmHandles) {
108 cmHandleIds.add(cmHandle.getId());
113 private List<SubscriptionStatus> populateSubscriptionStatus(final Set<String> cmHandleIds) {
114 final List<SubscriptionStatus> subscriptionStatuses = new ArrayList<>();
115 for (final String cmHandleId : cmHandleIds) {
116 final SubscriptionStatus status = new SubscriptionStatus();
117 status.setId(cmHandleId);
118 status.setStatus(SubscriptionStatus.Status.ACCEPTED);
119 subscriptionStatuses.add(status);
121 return subscriptionStatuses;