2  * ============LICENSE_START=======================================================
 
   3  *  Copyright (C) 2022-2023 Nordix Foundation
 
   4  *  ================================================================================
 
   5  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   6  *  you may not use this file except in compliance with the License.
 
   7  *  You may obtain a copy of the License at
 
   9  *        http://www.apache.org/licenses/LICENSE-2.0
 
  11  *  Unless required by applicable law or agreed to in writing, software
 
  12  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  *  See the License for the specific language governing permissions and
 
  15  *  limitations under the License.
 
  17  *  SPDX-License-Identifier: Apache-2.0
 
  18  *  ============LICENSE_END=========================================================
 
  21 package org.onap.cps.ncmp.api.impl.events.deprecated.cmsubscription;
 
  23 import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
 
  24 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL;
 
  25 import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING;
 
  27 import io.cloudevents.CloudEvent;
 
  28 import lombok.RequiredArgsConstructor;
 
  29 import lombok.extern.slf4j.Slf4j;
 
  30 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
  31 import org.onap.cps.ncmp.api.impl.deprecated.subscriptions.SubscriptionPersistence;
 
  32 import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
 
  33 import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
 
  34 import org.springframework.beans.factory.annotation.Value;
 
  35 import org.springframework.kafka.annotation.KafkaListener;
 
  36 import org.springframework.stereotype.Component;
 
  41 @RequiredArgsConstructor
 
  42 public class CmSubscriptionNcmpInEventConsumer {
 
  44     private final CmSubscriptionNcmpInEventForwarder cmSubscriptionNcmpInEventForwarder;
 
  45     private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper;
 
  46     private final SubscriptionPersistence subscriptionPersistence;
 
  48     @Value("${notification.enabled:true}")
 
  49     private boolean notificationFeatureEnabled;
 
  51     @Value("${ncmp.model-loader.subscription:false}")
 
  52     private boolean subscriptionModelLoaderEnabled;
 
  55      * Consume the specified event.
 
  57      * @param subscriptionEventConsumerRecord the event to be consumed
 
  59     @KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
 
  60             containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
 
  61     public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
 
  62         final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
 
  63         final String eventType = subscriptionEventConsumerRecord.value().getType();
 
  64         final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent =
 
  65                 toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
 
  66         final String eventDatastore = cmSubscriptionNcmpInEvent.getData().getPredicates().getDatastore();
 
  67         if (!eventDatastore.equals(PASSTHROUGH_RUNNING.getDatastoreName()) || eventDatastore.equals(
 
  68                 PASSTHROUGH_OPERATIONAL.getDatastoreName())) {
 
  69             throw new UnsupportedOperationException(
 
  70                     "passthrough datastores are currently only supported for event subscriptions");
 
  72         if ("CM".equals(cmSubscriptionNcmpInEvent.getData().getDataType().getDataCategory())) {
 
  73             if (subscriptionModelLoaderEnabled) {
 
  74                 persistSubscriptionEvent(cmSubscriptionNcmpInEvent);
 
  76             if ("subscriptionCreated".equals(cloudEvent.getType())) {
 
  77                 log.info("Subscription for ClientID {} with name {} ...",
 
  78                         cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID(),
 
  79                         cmSubscriptionNcmpInEvent.getData().getSubscription().getName());
 
  80                 if (notificationFeatureEnabled) {
 
  81                     cmSubscriptionNcmpInEventForwarder.forwardCreateSubscriptionEvent(cmSubscriptionNcmpInEvent,
 
  86             log.trace("Non-CM subscription event ignored");
 
  90     private void persistSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent) {
 
  91         final YangModelSubscriptionEvent yangModelSubscriptionEvent =
 
  92                 cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent);
 
  93         subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);