From 7276e0fb13ad9652f6e8b8cf20531507840ac4ff Mon Sep 17 00:00:00 2001 From: mpriyank Date: Fri, 18 Apr 2025 11:54:27 +0100 Subject: [PATCH] [DMI] Automate the process of device registration #1 - read from agreed topic using the VesEventSchema and call the registration endpoint in cps-ncmp to automate the device registration during mounting of the device. - also added non cloud kafka consumer factory as VES event is not cloud compliant - testware added Issue-ID: CPS-2710 Change-Id: I5ab695afc225dcc372cff00a2f6f69c9047b14ed Signed-off-by: mpriyank --- .../cps/ncmp/dmi/cmstack/ves/VesEventConsumer.java | 58 +++++++++++++++ .../dmi/cmstack/ves/VesEventsConfiguration.java | 39 ++++++++++ .../cps/ncmp/dmi/config/kafka/KafkaConfig.java | 29 ++++++-- dmi-service/src/main/resources/application.yml | 4 + .../dmi/cmstack/ves/VesEventConsumerSpec.groovy | 87 ++++++++++++++++++++++ .../cmstack/ves/VesEventsConfigurationSpec.groovy | 45 +++++++++++ dmi-service/src/test/resources/application.yml | 7 +- dmi-service/src/test/resources/sampleVesEvent.json | 52 +++++++++++++ 8 files changed, 314 insertions(+), 7 deletions(-) create mode 100644 dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumer.java create mode 100644 dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfiguration.java create mode 100644 dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumerSpec.groovy create mode 100644 dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfigurationSpec.groovy create mode 100644 dmi-service/src/test/resources/sampleVesEvent.json diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumer.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumer.java new file mode 100644 index 00000000..656e7e38 --- /dev/null +++ b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumer.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.ves; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.onap.cps.ncmp.dmi.service.DmiService; +import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class VesEventConsumer { + + private final DmiService dmiService; + + /** + * Consume the VES event to discover the cm handles. + * + * @param vesEventSchema Schema for virtual network function event stream + */ + @KafkaListener(topics = "#{@vesEventsConfiguration.topicNames}", + containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory", + properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema"}) + public void consumeVesEvent(final VesEventSchema vesEventSchema) { + + final String sourceName = vesEventSchema.getEvent().getCommonEventHeader().getSourceName(); + log.info("SourceName( CmHandleId ) from the VES event is : {}", sourceName); + try { + dmiService.registerCmHandles(List.of(sourceName)); + } catch (final Exception exception) { + log.warn("Exception occurred for CmHandleId : {} with cause : {}", sourceName, exception.getMessage(), + exception); + } + } + +} diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfiguration.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfiguration.java new file mode 100644 index 00000000..12262ad3 --- /dev/null +++ b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfiguration.java @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.ves; + +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + + +@Getter +@Setter +@Component +@ConfigurationProperties(prefix = "app.dmi.ves") +public class VesEventsConfiguration { + + private final List topicNames = new ArrayList<>(); + +} diff --git a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java index 25ee92ae..7be61f59 100644 --- a/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java +++ b/dmi-service/src/main/java/org/onap/cps/ncmp/dmi/config/kafka/KafkaConfig.java @@ -1,6 +1,6 @@ /* * ============LICENSE_START======================================================= - * Copyright (C) 2023 Nordix Foundation + * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ package org.onap.cps.ncmp.dmi.config.kafka; import io.cloudevents.CloudEvent; +import java.time.Duration; import java.util.Map; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.producer.ProducerConfig; @@ -132,11 +133,27 @@ public class KafkaConfig { */ @Bean public ConcurrentKafkaListenerContainerFactory - cloudEventConcurrentKafkaListenerContainerFactory() { - final ConcurrentKafkaListenerContainerFactory containerFactory = - new ConcurrentKafkaListenerContainerFactory<>(); - containerFactory.setConsumerFactory(cloudEventConsumerFactory()); - return containerFactory; + cloudEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + concurrentKafkaListenerContainerFactory.setConsumerFactory(cloudEventConsumerFactory()); + return concurrentKafkaListenerContainerFactory; + } + + /** + * A legacy concurrent kafka listener container factory. + * + * @return instance of Concurrent kafka listener factory + */ + @Bean + @Primary + public ConcurrentKafkaListenerContainerFactory legacyEventConcurrentKafkaListenerContainerFactory() { + final ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory = + new ConcurrentKafkaListenerContainerFactory<>(); + concurrentKafkaListenerContainerFactory.setConsumerFactory(legacyEventConsumerFactory()); + concurrentKafkaListenerContainerFactory.getContainerProperties() + .setAuthExceptionRetryInterval(Duration.ofSeconds(10)); + return concurrentKafkaListenerContainerFactory; } } diff --git a/dmi-service/src/main/resources/application.yml b/dmi-service/src/main/resources/application.yml index 003aa191..3f78e9fc 100644 --- a/dmi-service/src/main/resources/application.yml +++ b/dmi-service/src/main/resources/application.yml @@ -74,6 +74,10 @@ app: avc: cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription} cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription} + ves: + topicNames: + - "unauthenticated.VES_PNFREG_OUTPUT" + - "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT" notification: async: diff --git a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumerSpec.groovy b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumerSpec.groovy new file mode 100644 index 00000000..4a65c113 --- /dev/null +++ b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventConsumerSpec.groovy @@ -0,0 +1,87 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.ves + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import ch.qos.logback.classic.spi.ILoggingEvent +import ch.qos.logback.core.read.ListAppender +import com.fasterxml.jackson.databind.ObjectMapper +import org.onap.cps.ncmp.dmi.TestUtils +import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec +import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException +import org.onap.cps.ncmp.dmi.service.DmiService +import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema +import org.slf4j.LoggerFactory +import org.spockframework.spring.SpringBean +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.annotation.DirtiesContext +import org.testcontainers.spock.Testcontainers + +@SpringBootTest(classes = [ObjectMapper]) +@Testcontainers +@DirtiesContext +class VesEventConsumerSpec extends MessagingBaseSpec { + + def objectMapper = new ObjectMapper() + def dmiService = Mock(DmiService) + + @SpringBean + VesEventConsumer objectUnderTest = new VesEventConsumer(dmiService) + + def logger = Spy(ListAppender) + + def vesEvent + + void setup() { + + def jsonData = TestUtils.getResourceFileContent('sampleVesEvent.json') + vesEvent = objectMapper.readValue(jsonData, VesEventSchema.class) + + ((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).addAppender(logger) + logger.start() + } + + void cleanup() { + ((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).detachAndStopAllAppenders() + } + + + def 'Consume a VES event'() { + when: 'event is consumed' + objectUnderTest.consumeVesEvent(vesEvent) + then: 'cm handle(s) is registered with the dmi service' + 1 * dmiService.registerCmHandles(['pynts-o-du-o1']) + + } + + def 'Consume create event with error during registration'() { + given: 'an error occured during registration' + dmiService.registerCmHandles(_) >> { throw new CmHandleRegistrationException('some error for test') } + when: 'event is consumed' + objectUnderTest.consumeVesEvent(vesEvent) + then: 'the correct exception is logged as a warning' + def loggingEvent = logger.list[1] + assert loggingEvent.level == Level.WARN + assert loggingEvent.formattedMessage.contains('Not able to register the given cm-handles.') + } + +} diff --git a/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfigurationSpec.groovy b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfigurationSpec.groovy new file mode 100644 index 00000000..e46274fc --- /dev/null +++ b/dmi-service/src/test/groovy/org/onap/cps/ncmp/dmi/cmstack/ves/VesEventsConfigurationSpec.groovy @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================== + * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.dmi.cmstack.ves + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.test.context.ContextConfiguration +import spock.lang.Specification + +@SpringBootTest +@ContextConfiguration(classes = [VesEventsConfiguration]) +@EnableConfigurationProperties +class VesEventsConfigurationSpec extends Specification { + + @Autowired + VesEventsConfiguration vesEventsConfiguration + + def 'Check the test topics configured'() { + expect: 'VES topics are populated' + assert vesEventsConfiguration.topicNames.size() == 2 + and: 'correct topics are present' + assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_PNFREG_OUTPUT') + assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT') + } + +} diff --git a/dmi-service/src/test/resources/application.yml b/dmi-service/src/test/resources/application.yml index ddc2b45f..6cd5169f 100644 --- a/dmi-service/src/test/resources/application.yml +++ b/dmi-service/src/test/resources/application.yml @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (C) 2021-2023 Nordix Foundation +# Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -74,6 +74,11 @@ app: ncmp: async: topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} + dmi: + ves: + topicNames: + - "unauthenticated.VES_PNFREG_OUTPUT" + - "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT" logging: format: json diff --git a/dmi-service/src/test/resources/sampleVesEvent.json b/dmi-service/src/test/resources/sampleVesEvent.json new file mode 100644 index 00000000..02cd5a78 --- /dev/null +++ b/dmi-service/src/test/resources/sampleVesEvent.json @@ -0,0 +1,52 @@ +{ + "event": { + "commonEventHeader": { + "sourceId": "ManagementElement=pynts-o-du-o1", + "startEpochMicrosec": 1742470240133788, + "eventId": "ManagedElement=pynts-o-du-o1_pnfRegistration", + "timeZoneOffset": "+00:00", + "reportingEntityId": "", + "internalHeaderFields": { + "collectorTimeStamp": "Thu, 03 20 2025 11:30:40 GMT" + }, + "eventType": "PyNTS_pnfRegistration", + "priority": "Low", + "version": "4.1", + "nfVendorName": "pynts", + "reportingEntityName": "ManagementElement=pynts-o-du-o1", + "sequence": 0, + "domain": "pnfRegistration", + "lastEpochMicrosec": 1742470240133788, + "eventName": "pnfRegistration_PyNTS_pnfRegistration", + "vesEventListenerVersion": "7.2.1", + "sourceName": "pynts-o-du-o1", + "nfNamingCode": "001" + }, + "pnfRegistrationFields": { + "unitType": "o-du-o1", + "macAddress": "36:e9:09:e9:28:36", + "serialNumber": "pynts-o-du-o1-172.20.0.5-pynts", + "additionalFields": { + "protocol": "TLS", + "oamPort": "6513", + "betweenAttemptsTimeout": "2000", + "keepaliveDelay": "120", + "sleep-factor": "1.5", + "reconnectOnChangedSchema": "false", + "keyId": "tls-endpoint", + "connectionTimeout": "20000", + "maxConnectionAttempts": "100", + "tcpOnly": "false", + "username": "netconf" + }, + "pnfRegistrationFieldsVersion": "2.1", + "manufactureDate": "2021-01-16", + "modelNumber": "pynts", + "lastServiceDate": "2021-03-26", + "unitFamily": "pynts-o-du-o1", + "vendorName": "pynts", + "oamV4IpAddress": "172.20.0.5", + "softwareVersion": "2.3.5" + } + } +} \ No newline at end of file -- 2.16.6