package org.onap.cps.ncmp.api.impl.events.avcsubscription;
+import io.cloudevents.CloudEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
+import org.onap.cps.ncmp.api.impl.utils.SubscriptionEventCloudMapper;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
-import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent;
-import org.onap.cps.ncmp.event.model.SubscriptionEvent;
-import org.onap.cps.spi.exceptions.OperationNotYetSupportedException;
+import org.onap.cps.ncmp.events.avcsubscription1_0_0.client_to_ncmp.SubscriptionEvent;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
* @param subscriptionEventConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
- properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"})
- public void consumeSubscriptionEvent(
- final ConsumerRecord<String, SubscriptionEvent> subscriptionEventConsumerRecord) {
- final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value();
- final InnerSubscriptionEvent event = subscriptionEvent.getEvent();
- final String eventDatastore = event.getPredicates().getDatastore();
+ containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
+ public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> subscriptionEventConsumerRecord) {
+ final CloudEvent cloudEvent = subscriptionEventConsumerRecord.value();
+ final String eventType = subscriptionEventConsumerRecord.value().getType();
+ final SubscriptionEvent subscriptionEvent = SubscriptionEventCloudMapper.toSubscriptionEvent(cloudEvent);
+ final String eventDatastore = subscriptionEvent.getData().getPredicates().getDatastore();
if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
- throw new OperationNotYetSupportedException(
+ throw new UnsupportedOperationException(
"passthrough datastores are currently only supported for event subscriptions");
}
- if ("CM".equals(event.getDataType().getDataCategory())) {
- log.debug("Consuming event {} ...", subscriptionEvent);
+ if ("CM".equals(subscriptionEvent.getData().getDataType().getDataCategory())) {
if (subscriptionModelLoaderEnabled) {
persistSubscriptionEvent(subscriptionEvent);
}
- if ("CREATE".equals(subscriptionEvent.getEventType().value())) {
+ if ("subscriptionCreated".equals(cloudEvent.getType())) {
log.info("Subscription for ClientID {} with name {} ...",
- event.getSubscription().getClientID(),
- event.getSubscription().getName());
+ subscriptionEvent.getData().getSubscription().getClientID(),
+ subscriptionEvent.getData().getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent,
- subscriptionEventConsumerRecord.headers());
+ subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent, eventType);
}
}
} else {