package org.onap.cps.ncmp.impl.datajobs.subscription.dmi;
import static org.onap.cps.ncmp.api.NcmpResponseStatus.CM_DATA_SUBSCRIPTION_ACCEPTED;
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED;
import static org.onap.cps.ncmp.utils.events.CloudEventMapper.toTargetEvent;
import io.cloudevents.CloudEvent;
@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
public class EventConsumer {
- private final CmSubscriptionHandler cmSubscriptionHandler;
private static final String CORRELATION_ID_SEPARATOR = "#";
+ private final CmSubscriptionHandler cmSubscriptionHandler;
+
/**
* Consume the Cm Notification Subscription response event from the dmi-plugin.
*
if ("subscriptionCreateResponse".equals(eventType)) {
final CmSubscriptionStatus cmSubscriptionStatus = getCmSubscriptionStatus(dmiOutEvent);
- if (ACCEPTED.equals(cmSubscriptionStatus)) {
- cmSubscriptionHandler.updateCmSubscriptionStatus(
- subscriptionId, dmiPluginName, cmSubscriptionStatus);
- }
+ cmSubscriptionHandler.updateCmSubscriptionStatus(subscriptionId, dmiPluginName, cmSubscriptionStatus);
}
}
}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp;
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
-
@Service
@RequiredArgsConstructor
@Slf4j
@Override
public void createSubscription(final DataSelector dataSelector,
final String subscriptionId, final List<String> dataNodeSelectors) {
- for (final String dataNodeSelector : dataNodeSelectors) {
- cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
+ if (cmDataJobSubscriptionPersistenceService.isNewSubscriptionId(subscriptionId)) {
+ for (final String dataNodeSelector : dataNodeSelectors) {
+ cmDataJobSubscriptionPersistenceService.add(subscriptionId, dataNodeSelector);
+ }
+ sendEventToDmis(subscriptionId,
+ cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId),
+ dataSelector, "subscriptionCreateRequest");
}
- sendEventToDmis(subscriptionId,
- cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId),
- dataSelector, "subscriptionCreateRequest");
}
@Override
final CmSubscriptionStatus cmSubscriptionStatus) {
final List<String> dataNodeSelectors =
cmDataJobSubscriptionPersistenceService.getInactiveDataNodeSelectors(subscriptionId);
+ final List<String> rejectedDataNodeSelectors = new ArrayList<>();
for (final String dataNodeSelector : dataNodeSelectors) {
final String cmHandleId = getCmHandleId(dataNodeSelector);
if (cmHandleId == null) {
- log.info("Failed to resolve cm handle ID for dataNodeSelector {}", dataNodeSelector);
- } else {
- final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
- if (resolvedDmiServiceName.equals(dmiServiceName)) {
- cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
- cmSubscriptionStatus);
+ log.info("Ignoring dataNodeSelector={} because no matching CM Handle ID found",
+ dataNodeSelector);
+ continue;
+ }
+ final String resolvedDmiServiceName = getDmiServiceName(cmHandleId);
+ if (resolvedDmiServiceName.equals(dmiServiceName)) {
+ cmDataJobSubscriptionPersistenceService.updateCmSubscriptionStatus(dataNodeSelector,
+ cmSubscriptionStatus);
+ if (cmSubscriptionStatus.equals(REJECTED)) {
+ rejectedDataNodeSelectors.add(dataNodeSelector);
}
}
+
+ }
+ if (!rejectedDataNodeSelectors.isEmpty()) {
+ logRejectedDataNodeSelectors(subscriptionId, dmiServiceName, rejectedDataNodeSelectors);
}
}
+ private static void logRejectedDataNodeSelectors(final String subscriptionId, final String dmiServiceName,
+ final List<String> rejectedDataNodeSelectors) {
+ final String dataNodeSelectorAsString =
+ JexParser.toJsonExpressionsAsString(rejectedDataNodeSelectors);
+ log.info("DMI plugin {} rejected dataNodeSelector '{}' for dataJobId:{}",
+ dmiServiceName, dataNodeSelectorAsString, subscriptionId);
+ }
+
private void sendEventToDmis(final String subscriptionId,
final List<String> dataNodeSelectors,
final DataSelector dataSelector,
package org.onap.cps.ncmp.impl.datajobs.subscription.dmi
import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi_to_ncmp.DataJobSubscriptionDmiOutEvent
import org.onap.cps.ncmp.impl.datajobs.subscription.ncmp.CmSubscriptionHandler
-import org.onap.cps.ncmp.utils.TestUtils
import org.onap.cps.utils.JsonObjectMapper
import org.slf4j.LoggerFactory
import spock.lang.Specification
((Logger) LoggerFactory.getLogger(EventConsumer.class)).detachAndStopAllAppenders()
}
- def 'Consume subscription CREATE response with status ACCEPTED from DMI Plugin'() {
+ def 'Consume subscription #scenario CREATE response from DMI Plugin'() {
given: 'a response event from DMI'
- def jsonData = TestUtils.getResourceFileContent(
- 'datajobs/subscription/cmNotificationSubscriptionDmiOutEvent.json')
- def testEventSent = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
- def testCloudEventSent = CloudEventBuilder.v1()
- .withData(objectMapper.writeValueAsBytes(testEventSent))
- .withId('random-uuid')
- .withType('subscriptionCreateResponse')
- .withSource(URI.create('myDmi'))
- .withExtension('correlationid', 'sub-1#myDmi').build()
- def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', testCloudEventSent)
+ def responseEvent = createResponseEventFromDmi('sub-1#myDmi', 'myDmi', statusAsString)
+ def consumerRecord = new ConsumerRecord<String, CloudEvent>('topic-name', 0, 0, 'event-key', responseEvent)
when: 'the event is consumed'
objectUnderTest.consumeDmiOutEvent(consumerRecord)
then: 'an event is logged with level INFO'
- def loggingEvent = getLoggingEvent()
+ def loggingEvent = logger.list.first()
assert loggingEvent.level == Level.INFO
and: 'the log indicates the task completed successfully'
assert loggingEvent.formattedMessage == 'Consumed DMI subscription response event with details: | correlationId=sub-1#myDmi | eventType=subscriptionCreateResponse'
and: 'the subscription handler is called to update status of subscription with correct details'
- 1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', ACCEPTED)
+ 1 * mockCmSubscriptionHandler.updateCmSubscriptionStatus('sub-1', 'myDmi', cmSubscriptionStatus)
+ where: 'the following status is used'
+ scenario | statusAsString|| cmSubscriptionStatus
+ 'ACCEPTED'| 'ACCEPTED' || ACCEPTED
+ 'REJECTED'| 'REJECTED' || REJECTED
}
- def getLoggingEvent() {
- return logger.list[0]
+ def createResponseEventFromDmi(correlationId, dmiPluginName, status) {
+ def jsonData ='{"data":{"statusCode":"1","statusMessage":"' + status + '"}}'
+ def dmiOutEventData = jsonObjectMapper.convertJsonString(jsonData, DataJobSubscriptionDmiOutEvent.class)
+ def dmiOutEvent = CloudEventBuilder.v1()
+ .withData(objectMapper.writeValueAsBytes(dmiOutEventData))
+ .withId('random-uuid')
+ .withType('subscriptionCreateResponse')
+ .withSource(URI.create(dmiPluginName))
+ .withExtension('correlationid', correlationId).build()
+ return dmiOutEvent
}
}
package org.onap.cps.ncmp.impl.datajobs.subscription.ncmp
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
+import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.REJECTED
+
import org.onap.cps.ncmp.impl.datajobs.subscription.client_to_ncmp.DataSelector
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.DmiInEventMapper
import org.onap.cps.ncmp.impl.datajobs.subscription.dmi.EventProducer
import org.onap.cps.ncmp.impl.utils.JexParser
import spock.lang.Specification
-import static org.onap.cps.ncmp.impl.datajobs.subscription.models.CmSubscriptionStatus.ACCEPTED
-
class CmSubscriptionHandlerImplSpec extends Specification {
def mockCmSubscriptionPersistenceService = Mock(CmDataJobSubscriptionPersistenceService)
def mockInventoryPersistence = Mock(InventoryPersistence)
def mockAlternateIdMatcher = Mock(AlternateIdMatcher)
+ void setup() {
+ mockCmSubscriptionPersistenceService.isNewSubscriptionId(!'existingId') >> true
+ }
+
def objectUnderTest = new CmSubscriptionHandlerImpl(mockCmSubscriptionPersistenceService, mockDmiInEventMapper,
mockDmiInEventProducer, mockInventoryPersistence, mockAlternateIdMatcher)
+ def 'Attempt to create already existing subscription.'() {
+ given: 'the persistence service indicates the id is not new'
+ mockCmSubscriptionPersistenceService.isNewSubscriptionId('existingId') >> false
+ when: 'attempt to create the subscription'
+ objectUnderTest.createSubscription(new DataSelector(), 'existingId', ['/someDataNodeSelector'])
+ then: 'request is ignored and no method is invoked'
+ 0 * mockCmSubscriptionPersistenceService.add(*_)
+ and: 'no events are sent'
+ 0 * mockDmiInEventProducer.send(*_)
+ }
+
def 'Process subscription CREATE request for new target [non existing]'() {
given: 'relevant subscription details'
def mySubId = 'dataJobId'
'data node selector for other dmi' | 'someOtherDmi' || 0
}
+ def 'Log update when subscription status is REJECTED'() {
+ given: 'dmi service name and subscription id'
+ def myDmi = 'myDmi'
+ def mySubscriptionId = 'mySubscriptionId'
+ and: 'the persistence service returns all inactive data node selectors'
+ def myDataNodeSelectors = ['/parent[id=""]'].asList()
+ mockCmSubscriptionPersistenceService.getInactiveDataNodeSelectors(mySubscriptionId) >> myDataNodeSelectors
+ and: 'alternate id matcher always returns a cm handle id'
+ mockAlternateIdMatcher.getCmHandleId(_) >> 'someCmHandleId'
+ and: 'the inventory persistence service returns a yang model with the given dmi service name'
+ mockInventoryPersistence.getYangModelCmHandle(_) >> new YangModelCmHandle(dmiServiceName: myDmi)
+ when: 'update subscription status is called with status=REJECTED'
+ objectUnderTest.updateCmSubscriptionStatus(mySubscriptionId, myDmi, REJECTED)
+ then: 'the persistence service to update subscription status called with REJECTED for matching dmi name'
+ 1 * mockCmSubscriptionPersistenceService.updateCmSubscriptionStatus('/parent[id=""]', REJECTED)
+ }
+
def getFdn(dataNodeSelector) {
return JexParser.extractFdnPrefix(dataNodeSelector).orElse("")
+++ /dev/null
-{
- "data": {
- "statusCode": "1",
- "statusMessage": "ACCEPTED"
- }
-}
\ No newline at end of file