2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2023 Nordix Foundation
 
   4  *  ================================================================================
 
   5  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   6  *  you may not use this file except in compliance with the License.
 
   7  *  You may obtain a copy of the License at
 
   9  *        http://www.apache.org/licenses/LICENSE-2.0
 
  11  *  Unless required by applicable law or agreed to in writing, software
 
  12  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  *  See the License for the specific language governing permissions and
 
  15  *  limitations under the License.
 
  17  *  SPDX-License-Identifier: Apache-2.0
 
  18  *  ============LICENSE_END=========================================================
 
  21 package org.onap.cps.ncmp.api.impl.events.deprecated.cmsubscription;
 
  23 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
 
  25 import com.hazelcast.map.IMap;
 
  26 import io.cloudevents.CloudEvent;
 
  28 import java.util.concurrent.TimeUnit;
 
  29 import lombok.RequiredArgsConstructor;
 
  30 import lombok.extern.slf4j.Slf4j;
 
  31 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
  32 import org.onap.cps.ncmp.api.impl.config.embeddedcache.ForwardedSubscriptionEventCacheConfig;
 
  33 import org.onap.cps.ncmp.api.impl.deprecated.subscriptions.SubscriptionPersistence;
 
  34 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
 
  35 import org.onap.cps.ncmp.api.models.CmSubscriptionEvent;
 
  36 import org.onap.cps.ncmp.events.cmsubscription1_0_0.dmi_to_ncmp.CmSubscriptionDmiOutEvent;
 
  37 import org.springframework.beans.factory.annotation.Value;
 
  38 import org.springframework.kafka.annotation.KafkaListener;
 
  39 import org.springframework.stereotype.Component;
 
  43 @RequiredArgsConstructor
 
  44 public class CmSubscriptionDmiOutEventConsumer {
 
  46     private final IMap<String, Set<String>> forwardedSubscriptionEventCache;
 
  47     private final SubscriptionPersistence subscriptionPersistence;
 
  48     private final CmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper
 
  49             cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper;
 
  50     private final CmSubscriptionNcmpOutEventPublisher cmSubscriptionNcmpOutEventPublisher;
 
  52     @Value("${notification.enabled:true}")
 
  53     private boolean notificationFeatureEnabled;
 
  55     @Value("${ncmp.model-loader.subscription:false}")
 
  56     private boolean subscriptionModelLoaderEnabled;
 
  59      * Consume subscription response event.
 
  61      * @param cmSubscriptionDmiOutConsumerRecord the event to be consumed
 
  63     @KafkaListener(topics = "${app.ncmp.avc.subscription-response-topic}",
 
  64             containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
 
  65     public void consumeDmiOutEvent(
 
  66             final ConsumerRecord<String, CloudEvent> cmSubscriptionDmiOutConsumerRecord) {
 
  67         final CloudEvent cloudEvent = cmSubscriptionDmiOutConsumerRecord.value();
 
  68         final String eventType = cmSubscriptionDmiOutConsumerRecord.value().getType();
 
  69         final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent =
 
  70                 toTargetEvent(cloudEvent, CmSubscriptionDmiOutEvent.class);
 
  71         final String clientId = cmSubscriptionDmiOutEvent.getData().getClientId();
 
  72         log.info("subscription event response of clientId: {} is received.", clientId);
 
  73         final String subscriptionName = cmSubscriptionDmiOutEvent.getData().getSubscriptionName();
 
  74         final String subscriptionEventId = clientId + subscriptionName;
 
  75         boolean createOutcomeResponse = true;
 
  76         if (forwardedSubscriptionEventCache.containsKey(subscriptionEventId)) {
 
  77             final Set<String> dmiNames = forwardedSubscriptionEventCache.get(subscriptionEventId);
 
  78             dmiNames.remove(cmSubscriptionDmiOutEvent.getData().getDmiName());
 
  79             forwardedSubscriptionEventCache.put(subscriptionEventId, dmiNames,
 
  80                     ForwardedSubscriptionEventCacheConfig.SUBSCRIPTION_FORWARD_STARTED_TTL_SECS, TimeUnit.SECONDS);
 
  81             createOutcomeResponse = forwardedSubscriptionEventCache.get(subscriptionEventId).isEmpty();
 
  83         if (subscriptionModelLoaderEnabled) {
 
  84             updateSubscriptionEvent(cmSubscriptionDmiOutEvent);
 
  86         if (createOutcomeResponse
 
  87                 && notificationFeatureEnabled) {
 
  89             final CmSubscriptionEvent cmSubscriptionEvent = new CmSubscriptionEvent();
 
  90             cmSubscriptionEvent.setClientId(cmSubscriptionDmiOutEvent.getData().getClientId());
 
  91             cmSubscriptionEvent.setSubscriptionName(cmSubscriptionDmiOutEvent.getData().getSubscriptionName());
 
  93             cmSubscriptionNcmpOutEventPublisher.sendResponse(cmSubscriptionEvent, eventType);
 
  94             forwardedSubscriptionEventCache.remove(subscriptionEventId);
 
  98     private void updateSubscriptionEvent(final CmSubscriptionDmiOutEvent cmSubscriptionDmiOutEvent) {
 
  99         final YangModelSubscriptionEvent yangModelSubscriptionEvent =
 
 100                 cmSubscriptionDmiOutEventToYangModelSubscriptionEventMapper
 
 101                         .toYangModelSubscriptionEvent(cmSubscriptionDmiOutEvent);
 
 102         subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);