import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.onap.cps.ncmp.api.impl.subscriptions.SubscriptionPersistence;
import org.onap.cps.ncmp.api.impl.yangmodels.YangModelSubscriptionEvent;
import org.onap.cps.ncmp.event.model.InnerSubscriptionEvent;
/**
* Consume the specified event.
*
- * @param subscriptionEvent the event to be consumed
+ * @param subscriptionEventConsumerRecord the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.subscription-topic}",
properties = {"spring.json.value.default.type=org.onap.cps.ncmp.event.model.SubscriptionEvent"})
- public void consumeSubscriptionEvent(final SubscriptionEvent subscriptionEvent) {
+ public void consumeSubscriptionEvent(
+ final ConsumerRecord<String, SubscriptionEvent> subscriptionEventConsumerRecord) {
+ final SubscriptionEvent subscriptionEvent = subscriptionEventConsumerRecord.value();
final InnerSubscriptionEvent event = subscriptionEvent.getEvent();
final String eventDatastore = event.getPredicates().getDatastore();
if (!(eventDatastore.equals("passthrough-running") || eventDatastore.equals("passthrough-operational"))) {
event.getSubscription().getClientID(),
event.getSubscription().getName());
if (notificationFeatureEnabled) {
- subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent);
+ subscriptionEventForwarder.forwardCreateSubscriptionEvent(subscriptionEvent,
+ subscriptionEventConsumerRecord.headers());
}
}
} else {