/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
+ * Copyright (c) 2024-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
package org.onap.cps.ncmp.impl.cmnotificationsubscription.ncmp;
-import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
-
-import io.cloudevents.CloudEvent;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent;
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.Predicate;
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent;
+import org.onap.cps.ncmp.impl.utils.JexParser;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class NcmpInEventConsumer {
- private final CmSubscriptionHandler cmSubscriptionHandler;
-
/**
* Consume the specified event.
*
- * @param ncmpInEventAsConsumerRecord the event to be consumed
+ * @param dataJobSubscriptionOperationInEvent the event to be consumed
*/
@KafkaListener(topics = "${app.ncmp.avc.cm-subscription-ncmp-in}",
- containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
- public void consumeSubscriptionEvent(final ConsumerRecord<String, CloudEvent> ncmpInEventAsConsumerRecord) {
- final CloudEvent cloudEvent = ncmpInEventAsConsumerRecord.value();
- final NcmpInEvent ncmpInEvent = toTargetEvent(cloudEvent, NcmpInEvent.class);
- if (ncmpInEvent != null) {
- log.info("Subscription with name {} to be mapped to hazelcast object...",
- ncmpInEvent.getData().getSubscriptionId());
+ containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
+ properties = {"spring.json.value.default.type="
+ + "org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0"
+ + ".client_to_ncmp.DataJobSubscriptionOperationInEvent"})
+ public void consumeSubscriptionEvent(
+ final DataJobSubscriptionOperationInEvent dataJobSubscriptionOperationInEvent) {
+
+ final String eventType = dataJobSubscriptionOperationInEvent.getEventType();
+ final String dataNodeSelector = dataJobSubscriptionOperationInEvent.getEvent().getDataJob()
+ .getProductionJobDefinition().getTargetSelector().getDataNodeSelector();
+ final List<String> fdns = JexParser.extractFdnsFromLocationPaths(dataNodeSelector);
+ final String dataJobId = dataJobSubscriptionOperationInEvent.getEvent().getDataJob().getId();
+ final String dataTypeId = dataJobSubscriptionOperationInEvent.getEvent().getDataType() != null
+ ? dataJobSubscriptionOperationInEvent.getEvent().getDataType().getDataTypeId() : "UNKNOWN";
- final String subscriptionId = ncmpInEvent.getData().getSubscriptionId();
- final List<Predicate> predicates = ncmpInEvent.getData().getPredicates();
- if ("subscriptionCreateRequest".equals(cloudEvent.getType())) {
- log.info("Subscription create request for source {} with subscription id {} ...",
- cloudEvent.getSource(), subscriptionId);
- cmSubscriptionHandler.processSubscriptionCreateRequest(subscriptionId, predicates);
- }
- if ("subscriptionDeleteRequest".equals(cloudEvent.getType())) {
- log.info("Subscription delete request for source {} with subscription id {} ...",
- cloudEvent.getSource(), subscriptionId);
- cmSubscriptionHandler.processSubscriptionDeleteRequest(subscriptionId);
- }
- }
+ log.info("Consumed subscription event with details: | jobId={} | eventType={} | fdns={} | dataType={}",
+ dataJobId, eventType, fdns, dataTypeId);
}
}
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import com.fasterxml.jackson.databind.ObjectMapper
-import io.cloudevents.CloudEvent
-import io.cloudevents.core.builder.CloudEventBuilder
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.NcmpInEvent
+import org.onap.cps.ncmp.impl.cmnotificationsubscription_1_0_0.client_to_ncmp.DataJobSubscriptionOperationInEvent
import org.onap.cps.ncmp.utils.TestUtils
-import org.onap.cps.ncmp.utils.events.MessagingBaseSpec
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
+import spock.lang.Specification
@SpringBootTest(classes = [ObjectMapper, JsonObjectMapper])
-class NcmpInEventConsumerSpec extends MessagingBaseSpec {
+class NcmpInEventConsumerSpec extends Specification {
- def mockCmSubscriptionHandler = Mock(CmSubscriptionHandler)
- def objectUnderTest = new NcmpInEventConsumer(mockCmSubscriptionHandler)
- def logger = Spy(ListAppender<ILoggingEvent>)
+ def objectUnderTest = new NcmpInEventConsumer()
+ def logger = new ListAppender<ILoggingEvent>()
@Autowired
JsonObjectMapper jsonObjectMapper
((Logger) LoggerFactory.getLogger(NcmpInEventConsumer.class)).detachAndStopAllAppenders()
}
-
- def 'Consume valid CmNotificationSubscriptionNcmpInEvent create message.'() {
- given: 'a cmNotificationSubscription event'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('subscriptionCreated')
- .withType('subscriptionCreateRequest')
- .withSource(URI.create('some-resource'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
- when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(consumerRecord)
- then: 'an event is logged with level INFO'
- def loggingEvent = getLoggingEvent()
+ def 'Consuming CM Data Notification #scenario data type id.'() {
+ given: 'a JSON file containing a subscription event'
+ def jsonData = TestUtils.getResourceFileContent('sample_dataJobSubscriptionInEvent.json')
+ jsonData = jsonData.replace('#dataTypeId', dataTypeId)
+ def event = objectMapper.readValue(jsonData, DataJobSubscriptionOperationInEvent)
+ when: 'the event is consumed'
+ objectUnderTest.consumeSubscriptionEvent(event)
+ then: 'event details are logged at level INFO'
+ def loggingEvent = logger.list.last()
assert loggingEvent.level == Level.INFO
- and: 'the log indicates the task completed successfully'
- assert loggingEvent.formattedMessage == 'Subscription create request for source some-resource with subscription id test-id ...'
- and: 'the subscription handler service is called once'
- 1 * mockCmSubscriptionHandler.processSubscriptionCreateRequest('test-id',_)
+ assert loggingEvent.formattedMessage.contains('jobId=my job id')
+ assert loggingEvent.formattedMessage.contains('eventType=my event type')
+ assert loggingEvent.formattedMessage.contains("dataType=${dataTypeId}")
+ assert loggingEvent.formattedMessage.contains('fdns=[/SubNetwork=SN1]')
+ where: 'the following data type ids are used'
+ scenario | dataTypeId
+ 'with' | 'my data type'
+ 'without' | 'null'
}
-
- def 'Consume valid CmNotificationSubscriptionNcmpInEvent delete message.'() {
- given: 'a cmNotificationSubscription event'
- def jsonData = TestUtils.getResourceFileContent('cmSubscription/cmNotificationSubscriptionNcmpInEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, NcmpInEvent)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('sub-id')
- .withType('subscriptionDeleteRequest')
- .withSource(URI.create('some-resource'))
- .withExtension('correlationid', 'test-cmhandle1').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
- when: 'the valid event is consumed'
- objectUnderTest.consumeSubscriptionEvent(consumerRecord)
- then: 'an event is logged with level INFO'
- def loggingEvent = getLoggingEvent()
- assert loggingEvent.level == Level.INFO
- and: 'the log indicates the task completed successfully'
- assert loggingEvent.formattedMessage == 'Subscription delete request for source some-resource with subscription id test-id ...'
- and: 'the subscription handler service is called once'
- 1 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest('test-id')
- }
-
- def 'Attempt to consume unsupported Event.'() {
- given: 'a unsupported event with a valid supported type'
- def unsupportedEvent = Mock(CloudEvent)
- def cloudEventWithUnsupportedEvent = CloudEventBuilder.v1()
- .withId('some id')
- .withType('subscriptionCreateRequest') // this is valid but does not match the event object
- .withSource(URI.create('some-resource'))
- .withData(objectMapper.writeValueAsBytes(unsupportedEvent)).build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('some topic', 0, 0, 'some key', cloudEventWithUnsupportedEvent)
- when: 'attempt to consume the unsupported event'
- objectUnderTest.consumeSubscriptionEvent(consumerRecord)
- then: 'the subscription handler service is not called at all'
- 0 * mockCmSubscriptionHandler.processSubscriptionDeleteRequest(*_)
- 0 * mockCmSubscriptionHandler.processSubscriptionCreateRequest(*_)
- }
-
- def getLoggingEvent() {
- return logger.list[1]
- }
-
}