public class CmNotificationSubscriptionDmiInEventConsumer {
- @Value("${app.dmi.avc.subscription-response-topic}")
- private String cmNotificationSubscriptionResponseTopic;
+ @Value("${app.dmi.avc.cm-subscription-dmi-out}")
+ private String cmNotificationSubscriptionDmiOutTopic;
@Value("${dmi.service.name}")
private String dmiName;
private final KafkaTemplate<String, CloudEvent> cloudEventKafkaTemplate;
*
* @param cmNotificationSubscriptionDmiInCloudEvent the event to be consumed
*/
- @KafkaListener(topics = "${app.dmi.avc.subscription-topic}",
+ @KafkaListener(topics = "${app.dmi.avc.cm-subscription-dmi-in}",
containerFactory = "cloudEventConcurrentKafkaListenerContainerFactory")
public void consumeCmNotificationSubscriptionDmiInEvent(
final ConsumerRecord<String, CloudEvent> cmNotificationSubscriptionDmiInCloudEvent) {
final String correlationId = String.valueOf(cmNotificationSubscriptionDmiInCloudEvent.value()
.getExtension("correlationid"));
- if ("subscriptionCreated".equals(subscriptionType)) {
+ if ("subscriptionCreateRequest".equals(subscriptionType)) {
createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionCreateResponse",
correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
- } else if ("subscriptionDeleted".equals(subscriptionType)) {
+ } else if ("subscriptionDeleteRequest".equals(subscriptionType)) {
createAndSendCmNotificationSubscriptionDmiOutEvent(subscriptionId, "subscriptionDeleteResponse",
correlationId, CmNotificationSubscriptionStatus.ACCEPTED);
}
cmNotificationSubscriptionDmiOutEventData.setStatusCode("1");
cmNotificationSubscriptionDmiOutEventData.setStatusMessage("ACCEPTED");
} else {
- cmNotificationSubscriptionDmiOutEventData.setStatusCode("2");
+ cmNotificationSubscriptionDmiOutEventData.setStatusCode("104");
cmNotificationSubscriptionDmiOutEventData.setStatusMessage("REJECTED");
}
cmNotificationSubscriptionDmiOutEvent.setData(cmNotificationSubscriptionDmiOutEventData);
- cloudEventKafkaTemplate.send(cmNotificationSubscriptionResponseTopic, eventKey,
+ cloudEventKafkaTemplate.send(cmNotificationSubscriptionDmiOutTopic, eventKey,
CmNotificationSubscriptionDmiOutEventToCloudEventMapper.toCloudEvent(cmNotificationSubscriptionDmiOutEvent,
subscriptionType, dmiName, correlationId));
# ============LICENSE_START=======================================================
-# Copyright (C) 2021-2023 Nordix Foundation
+# Copyright (C) 2021-2024 Nordix Foundation
# Modifications Copyright (C) 2021 Bell Canada.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
dmi:
avc:
- subscription-topic: ${DMI_CM_AVC_SUBSCRIPTION:ncmp-dmi-cm-avc-subscription-${dmi.service.name}}
- subscription-response-topic: ${DMI_CM_AVC_SUBSCRIPTION_RESPONSE:dmi-ncmp-cm-avc-subscription}
+ cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
+ cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
notification:
async:
import java.time.ZoneId
-@SpringBootTest(classes = [CmNotificationSubscriptionDmiInEventConsumer])
@Testcontainers
@DirtiesContext
class CmNotificationSubscriptionDmiInEventConsumerSpec extends MessagingBaseSpec {
def objectMapper = new ObjectMapper()
def testTopic = 'dmi-ncmp-cm-avc-subscription'
+ def testDmiName = 'test-ncmp-dmi'
@SpringBean
CmNotificationSubscriptionDmiInEventConsumer objectUnderTest = new CmNotificationSubscriptionDmiInEventConsumer(cloudEventKafkaTemplate)
def 'Sends subscription cloud event response successfully.'() {
given: 'an subscription event response'
- objectUnderTest.dmiName = 'test-ncmp-dmi'
- objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+ objectUnderTest.dmiName = testDmiName
+ objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
def correlationId = 'test-subscriptionId#test-ncmp-dmi'
def cmSubscriptionDmiOutEventData = new Data(statusCode: subscriptionStatusCode, statusMessage: subscriptionStatusMessage)
def subscriptionEventResponse =
where: 'given #scenario'
scenario | subscriptionAcceptanceType | subscriptionStatusCode | subscriptionStatusMessage
'Subscription is Accepted' | CmNotificationSubscriptionStatus.ACCEPTED | '1' | 'ACCEPTED'
- 'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED | '2' | 'REJECTED'
+ 'Subscription is Rejected' | CmNotificationSubscriptionStatus.REJECTED | '104' | 'REJECTED'
}
def 'Consume valid message.'() {
given: 'an event'
- objectUnderTest.dmiName = 'test-ncmp-dmi'
+ objectUnderTest.dmiName = testDmiName
def eventKey = UUID.randomUUID().toString()
def timestamp = new Timestamp(1679521929511)
def jsonData = TestUtils.getResourceFileContent('cmNotificationSubscriptionCreationEvent.json')
def subscriptionEvent = objectMapper.readValue(jsonData, CmNotificationSubscriptionDmiInEvent.class)
- objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+ objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
.withType(subscriptionType)
.withDataSchema(URI.create("urn:cps:" + CmNotificationSubscriptionDmiInEvent.class.getName() + ":1.0.0"))
def 'Consume invalid message.'() {
given: 'an invalid event body'
- objectUnderTest.dmiName = 'test-ncmp-dmi'
+ objectUnderTest.dmiName = testDmiName
def eventKey = UUID.randomUUID().toString()
def timestamp = new Timestamp(1679521929511)
def invalidJsonBody = "/////"
- objectUnderTest.cmNotificationSubscriptionResponseTopic = testTopic
+ objectUnderTest.cmNotificationSubscriptionDmiOutTopic = testTopic
def cloudEvent = CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create('test-ncmp-dmi'))
.withType("subscriptionCreated")
.withDataSchema(URI.create("urn:cps:org.onap.ncmp.dmi.cm.subscription:1.0.0"))