--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.ves;
+
+import java.util.List;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.onap.cps.ncmp.dmi.service.DmiService;
+import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class VesEventConsumer {
+
+ private final DmiService dmiService;
+
+ /**
+ * Consume the VES event to discover the cm handles.
+ *
+ * @param vesEventSchema Schema for virtual network function event stream
+ */
+ @KafkaListener(topics = "#{@vesEventsConfiguration.topicNames}",
+ containerFactory = "legacyEventConcurrentKafkaListenerContainerFactory",
+ properties = {"spring.json.value.default.type=org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema"})
+ public void consumeVesEvent(final VesEventSchema vesEventSchema) {
+
+ final String sourceName = vesEventSchema.getEvent().getCommonEventHeader().getSourceName();
+ log.info("SourceName( CmHandleId ) from the VES event is : {}", sourceName);
+ try {
+ dmiService.registerCmHandles(List.of(sourceName));
+ } catch (final Exception exception) {
+ log.warn("Exception occurred for CmHandleId : {} with cause : {}", sourceName, exception.getMessage(),
+ exception);
+ }
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.ves;
+
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+
+@Getter
+@Setter
+@Component
+@ConfigurationProperties(prefix = "app.dmi.ves")
+public class VesEventsConfiguration {
+
+ private final List<String> topicNames = new ArrayList<>();
+
+}
/*
* ============LICENSE_START=======================================================
- * Copyright (C) 2023 Nordix Foundation
+ * Copyright (C) 2023-2025 OpenInfra Foundation Europe. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.cps.ncmp.dmi.config.kafka;
import io.cloudevents.CloudEvent;
+import java.time.Duration;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.producer.ProducerConfig;
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, CloudEvent>
- cloudEventConcurrentKafkaListenerContainerFactory() {
- final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> containerFactory =
- new ConcurrentKafkaListenerContainerFactory<>();
- containerFactory.setConsumerFactory(cloudEventConsumerFactory());
- return containerFactory;
+ cloudEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, CloudEvent> concurrentKafkaListenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ concurrentKafkaListenerContainerFactory.setConsumerFactory(cloudEventConsumerFactory());
+ return concurrentKafkaListenerContainerFactory;
+ }
+
+ /**
+ * A legacy concurrent kafka listener container factory.
+ *
+ * @return instance of Concurrent kafka listener factory
+ */
+ @Bean
+ @Primary
+ public ConcurrentKafkaListenerContainerFactory<String, T> legacyEventConcurrentKafkaListenerContainerFactory() {
+ final ConcurrentKafkaListenerContainerFactory<String, T> concurrentKafkaListenerContainerFactory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ concurrentKafkaListenerContainerFactory.setConsumerFactory(legacyEventConsumerFactory());
+ concurrentKafkaListenerContainerFactory.getContainerProperties()
+ .setAuthExceptionRetryInterval(Duration.ofSeconds(10));
+ return concurrentKafkaListenerContainerFactory;
}
}
avc:
cm-subscription-dmi-in: ${CM_SUBSCRIPTION_DMI_IN_TOPIC:ncmp-dmi-cm-avc-subscription}
cm-subscription-dmi-out: ${CM_SUBSCRIPTION_DMI_OUT_TOPIC:dmi-ncmp-cm-avc-subscription}
+ ves:
+ topicNames:
+ - "unauthenticated.VES_PNFREG_OUTPUT"
+ - "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT"
notification:
async:
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.ves
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import ch.qos.logback.classic.spi.ILoggingEvent
+import ch.qos.logback.core.read.ListAppender
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.onap.cps.ncmp.dmi.TestUtils
+import org.onap.cps.ncmp.dmi.api.kafka.MessagingBaseSpec
+import org.onap.cps.ncmp.dmi.exception.CmHandleRegistrationException
+import org.onap.cps.ncmp.dmi.service.DmiService
+import org.onap.cps.ncmp.events.ves30_2_1.VesEventSchema
+import org.slf4j.LoggerFactory
+import org.spockframework.spring.SpringBean
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.annotation.DirtiesContext
+import org.testcontainers.spock.Testcontainers
+
+@SpringBootTest(classes = [ObjectMapper])
+@Testcontainers
+@DirtiesContext
+class VesEventConsumerSpec extends MessagingBaseSpec {
+
+ def objectMapper = new ObjectMapper()
+ def dmiService = Mock(DmiService)
+
+ @SpringBean
+ VesEventConsumer objectUnderTest = new VesEventConsumer(dmiService)
+
+ def logger = Spy(ListAppender<ILoggingEvent>)
+
+ def vesEvent
+
+ void setup() {
+
+ def jsonData = TestUtils.getResourceFileContent('sampleVesEvent.json')
+ vesEvent = objectMapper.readValue(jsonData, VesEventSchema.class)
+
+ ((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).addAppender(logger)
+ logger.start()
+ }
+
+ void cleanup() {
+ ((Logger) LoggerFactory.getLogger(VesEventConsumer.class)).detachAndStopAllAppenders()
+ }
+
+
+ def 'Consume a VES event'() {
+ when: 'event is consumed'
+ objectUnderTest.consumeVesEvent(vesEvent)
+ then: 'cm handle(s) is registered with the dmi service'
+ 1 * dmiService.registerCmHandles(['pynts-o-du-o1'])
+
+ }
+
+ def 'Consume create event with error during registration'() {
+ given: 'an error occured during registration'
+ dmiService.registerCmHandles(_) >> { throw new CmHandleRegistrationException('some error for test') }
+ when: 'event is consumed'
+ objectUnderTest.consumeVesEvent(vesEvent)
+ then: 'the correct exception is logged as a warning'
+ def loggingEvent = logger.list[1]
+ assert loggingEvent.level == Level.WARN
+ assert loggingEvent.formattedMessage.contains('Not able to register the given cm-handles.')
+ }
+
+}
--- /dev/null
+/*
+ * ============LICENSE_START========================================================
+ * Copyright (c) 2025 OpenInfra Foundation Europe. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.dmi.cmstack.ves
+
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.context.properties.EnableConfigurationProperties
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.test.context.ContextConfiguration
+import spock.lang.Specification
+
+@SpringBootTest
+@ContextConfiguration(classes = [VesEventsConfiguration])
+@EnableConfigurationProperties
+class VesEventsConfigurationSpec extends Specification {
+
+ @Autowired
+ VesEventsConfiguration vesEventsConfiguration
+
+ def 'Check the test topics configured'() {
+ expect: 'VES topics are populated'
+ assert vesEventsConfiguration.topicNames.size() == 2
+ and: 'correct topics are present'
+ assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_PNFREG_OUTPUT')
+ assert vesEventsConfiguration.topicNames.contains('unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT')
+ }
+
+}
# ============LICENSE_START=======================================================
-# Copyright (C) 2021-2023 Nordix Foundation
+# Copyright (C) 2021-2025 OpenInfra Foundation Europe. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
ncmp:
async:
topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}
+ dmi:
+ ves:
+ topicNames:
+ - "unauthenticated.VES_PNFREG_OUTPUT"
+ - "unauthenticated.VES_O1_NOTIFY_PNF_REGISTRATION_OUTPUT"
logging:
format: json
--- /dev/null
+{
+ "event": {
+ "commonEventHeader": {
+ "sourceId": "ManagementElement=pynts-o-du-o1",
+ "startEpochMicrosec": 1742470240133788,
+ "eventId": "ManagedElement=pynts-o-du-o1_pnfRegistration",
+ "timeZoneOffset": "+00:00",
+ "reportingEntityId": "",
+ "internalHeaderFields": {
+ "collectorTimeStamp": "Thu, 03 20 2025 11:30:40 GMT"
+ },
+ "eventType": "PyNTS_pnfRegistration",
+ "priority": "Low",
+ "version": "4.1",
+ "nfVendorName": "pynts",
+ "reportingEntityName": "ManagementElement=pynts-o-du-o1",
+ "sequence": 0,
+ "domain": "pnfRegistration",
+ "lastEpochMicrosec": 1742470240133788,
+ "eventName": "pnfRegistration_PyNTS_pnfRegistration",
+ "vesEventListenerVersion": "7.2.1",
+ "sourceName": "pynts-o-du-o1",
+ "nfNamingCode": "001"
+ },
+ "pnfRegistrationFields": {
+ "unitType": "o-du-o1",
+ "macAddress": "36:e9:09:e9:28:36",
+ "serialNumber": "pynts-o-du-o1-172.20.0.5-pynts",
+ "additionalFields": {
+ "protocol": "TLS",
+ "oamPort": "6513",
+ "betweenAttemptsTimeout": "2000",
+ "keepaliveDelay": "120",
+ "sleep-factor": "1.5",
+ "reconnectOnChangedSchema": "false",
+ "keyId": "tls-endpoint",
+ "connectionTimeout": "20000",
+ "maxConnectionAttempts": "100",
+ "tcpOnly": "false",
+ "username": "netconf"
+ },
+ "pnfRegistrationFieldsVersion": "2.1",
+ "manufactureDate": "2021-01-16",
+ "modelNumber": "pynts",
+ "lastServiceDate": "2021-03-26",
+ "unitFamily": "pynts-o-du-o1",
+ "vendorName": "pynts",
+ "oamV4IpAddress": "172.20.0.5",
+ "softwareVersion": "2.3.5"
+ }
+ }
+}
\ No newline at end of file