/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2022-2023 Nordix Foundation
+ * Copyright (C) 2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.api.impl.events.cmsubscription;
import static org.onap.cps.ncmp.api.impl.events.mapper.CloudEventMapper.toTargetEvent;
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_OPERATIONAL;
-import static org.onap.cps.ncmp.api.impl.operations.DatastoreType.PASSTHROUGH_RUNNING;
import io.cloudevents.CloudEvent;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
-import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.events.cmsubscription1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
+import org.onap.cps.ncmp.events.cmsubscription_merge1_0_0.client_to_ncmp.CmSubscriptionNcmpInEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
-
@Component
@Slf4j
-@RequiredArgsConstructor
public class CmSubscriptionNcmpInEventConsumer {
- private final CmSubscriptionNcmpInEventForwarder cmSubscriptionNcmpInEventForwarder;
- private final CmSubscriptionNcmpInEventMapper cmSubscriptionNcmpInEventMapper;
- private final SubscriptionPersistence subscriptionPersistence;
-
@Value("${notification.enabled:true}")
private boolean notificationFeatureEnabled;
* @param subscriptionEventConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
- final String eventType = subscriptionEventConsumerRecord.value().getType();
final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent =
- toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
- final String eventDatastore = cmSubscriptionNcmpInEvent.getData().getPredicates().getDatastore();
- if (!eventDatastore.equals(PASSTHROUGH_RUNNING.getDatastoreName()) || eventDatastore.equals(
- PASSTHROUGH_OPERATIONAL.getDatastoreName())) {
- throw new UnsupportedOperationException(
- "passthrough datastores are currently only supported for event subscriptions");
+ toTargetEvent(cloudEvent, CmSubscriptionNcmpInEvent.class);
+ if (subscriptionModelLoaderEnabled) {
+ log.info("Subscription with name {} to be mapped to hazelcast object...",
+ cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
}
- if ("CM".equals(cmSubscriptionNcmpInEvent.getData().getDataType().getDataCategory())) {
- if (subscriptionModelLoaderEnabled) {
- persistSubscriptionEvent(cmSubscriptionNcmpInEvent);
- }
- if ("subscriptionCreated".equals(cloudEvent.getType())) {
- log.info("Subscription for ClientID {} with name {} ...",
- cmSubscriptionNcmpInEvent.getData().getSubscription().getClientID(),
- cmSubscriptionNcmpInEvent.getData().getSubscription().getName());
- if (notificationFeatureEnabled) {
- cmSubscriptionNcmpInEventForwarder.forwardCreateSubscriptionEvent(cmSubscriptionNcmpInEvent,
- eventType);
- }
- }
- } else {
- log.trace("Non-CM subscription event ignored");
+ if ("subscriptionCreated".equals(cloudEvent.getType()) && cmSubscriptionNcmpInEvent != null) {
+ log.info("Subscription for ClientID {} with name {} ...",
+ cloudEvent.getSource(),
+ cmSubscriptionNcmpInEvent.getData().getSubscriptionId());
}
}
-
- private void persistSubscriptionEvent(final CmSubscriptionNcmpInEvent cmSubscriptionNcmpInEvent) {
- final YangModelSubscriptionEvent yangModelSubscriptionEvent =
- cmSubscriptionNcmpInEventMapper.toYangModelSubscriptionEvent(cmSubscriptionNcmpInEvent);
- subscriptionPersistence.saveSubscriptionEvent(yangModelSubscriptionEvent);
- }
-
}