From: JosephKeenan Date: Thu, 16 Jun 2022 15:19:09 +0000 (+0100) Subject: Kafka consumer can not be turned off X-Git-Tag: 3.1.0~89^2 X-Git-Url: https://gerrit.onap.org/r/gitweb?p=cps.git;a=commitdiff_plain;h=1c90848a0cb078e0249a7dc888ea05390f59a1e6 Kafka consumer can not be turned off -NOTE: Build will fail until docker-compose version issues on build server are fixed --Ticket raised https://jira.linuxfoundation.org/plugins/servlet/theme/portal/2/IT-24219 -added flag for async -added response if async is triggered without being enabled & associated test -modified to use one global flag for notifications Issue-ID: CPS-1088 Signed-off-by: JosephKeenan Change-Id: If9d988b4dcb71bf37c1b1bf9464090782708ffc2 --- diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 802a18b84..14abebb2b 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -88,14 +88,14 @@ app: topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m} events: topic: ${NCMP_EVENTS_TOPIC:ncmp-events} + notification: + enabled: true data-updated: - enabled: false topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events} filters: enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""} async: - enabled: false executor: core-pool-size: 2 max-pool-size: 10 diff --git a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java index fb234ef71..5703d5e86 100755 --- a/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java +++ b/cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java @@ -28,7 +28,6 @@ import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.PATCH; import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.UPDATE; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -75,9 +74,10 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { private final NcmpRestInputMapper ncmpRestInputMapper; private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper; private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor; - @Value("${notification.async.executor.time-out-value-in-ms:2000}") private int timeOutInMilliSeconds; + @Value("${notification.enabled:true}") + private boolean asyncEnabled; /** * Get resource data from operational datastore. @@ -93,15 +93,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - if (isValidTopic(topicParamInQuery)) { + if (asyncEnabled && isValidTopic(topicParamInQuery)) { final String requestId = UUID.randomUUID().toString(); + log.info("Received Async passthrough-operational request with id {}", requestId); cpsNcmpTaskExecutor.executeTask(() -> - networkCmProxyDataService.getResourceDataOperationalForCmHandle( - cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, - requestId - ), timeOutInMilliSeconds + networkCmProxyDataService.getResourceDataOperationalForCmHandle( + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId + ), timeOutInMilliSeconds ); - return acknowledgeAsyncRequest(requestId); + return ResponseEntity.ok(Map.of("requestId", requestId)); + } else { + log.warn("Asynchronous messaging is currently disabled for passthrough-operational." + + " Will use synchronous operation."); } final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle( @@ -124,15 +127,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { final @NotNull @Valid String resourceIdentifier, final @Valid String optionsParamInQuery, final @Valid String topicParamInQuery) { - if (isValidTopic(topicParamInQuery)) { - final String resourceDataRequestId = UUID.randomUUID().toString(); + if (asyncEnabled && isValidTopic(topicParamInQuery)) { + final String requestId = UUID.randomUUID().toString(); + log.info("Received Async passthrough-running request with id {}", requestId); cpsNcmpTaskExecutor.executeTask(() -> networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( - cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, - resourceDataRequestId + cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId ), timeOutInMilliSeconds ); - return acknowledgeAsyncRequest(resourceDataRequestId); + return ResponseEntity.ok(Map.of("requestId", requestId)); + } else { + log.warn("Asynchronous messaging is currently disabled for passthrough-running." + + " Will use synchronous operation."); } final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle( @@ -301,11 +307,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi { throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic"); } - private ResponseEntity acknowledgeAsyncRequest(final String requestId) { - final Map acknowledgeData = new HashMap<>(1); - acknowledgeData.put("requestId", requestId); - return ResponseEntity.ok(acknowledgeData); - } - } diff --git a/cps-ncmp-rest/src/test/resources/application.yml b/cps-ncmp-rest/src/test/resources/application.yml index 0241696c5..9df1e580f 100644 --- a/cps-ncmp-rest/src/test/resources/application.yml +++ b/cps-ncmp-rest/src/test/resources/application.yml @@ -1,5 +1,5 @@ # ============LICENSE_START======================================================= -# Copyright (C) 2021 Nordix Foundation +# Copyright (C) 2021-2022 Nordix Foundation # Modifications Copyright (C) 2021 Bell Canada. # ================================================================================ # Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,6 +23,7 @@ rest: ncmp-inventory-base-path: /ncmpInventory notification: + enabled: true async: executor: time-out-value-in-ms: 2000 \ No newline at end of file diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java index 4e5c57ba5..a9e7164fd 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java @@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent; import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @@ -33,6 +34,7 @@ import org.springframework.stereotype.Component; @Component @Slf4j @RequiredArgsConstructor +@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) public class NcmpAsyncRequestResponseEventConsumer { private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer; diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsService.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsService.java index 6804ac0f0..7b5ceb57a 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsService.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsService.java @@ -47,17 +47,23 @@ public class NcmpEventsService { @Value("${app.ncmp.events.topic:ncmp-events}") private String topicName; + @Value("${notification.enabled:true}") + private boolean notificationsEnabled; + /** * Publish the NcmpEvent to the public topic. * * @param cmHandleId Cm Handle Id */ public void publishNcmpEvent(final String cmHandleId) { - - final NcmpServiceCmHandle ncmpServiceCmHandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle( - inventoryPersistence.getYangModelCmHandle(cmHandleId)); - final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle); - ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent); - + if (notificationsEnabled) { + final NcmpServiceCmHandle ncmpServiceCmHandle = + YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle( + inventoryPersistence.getYangModelCmHandle(cmHandleId)); + final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle); + ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent); + } else { + log.debug("Notifications disabled."); + } } } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsServiceSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsServiceSpec.groovy index e265fef05..52806a867 100644 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsServiceSpec.groovy +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsServiceSpec.groovy @@ -30,17 +30,25 @@ class NcmpEventsServiceSpec extends Specification { def mockInventoryPersistence = Mock(InventoryPersistence) def mockNcmpEventsPublisher = Mock(NcmpEventsPublisher) - def mockNcmpEventsMapper = Mock(NcmpEventsCreator) + def mockNcmpEventsCreator = Mock(NcmpEventsCreator) - def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsMapper) + def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsCreator) - def 'Create and Publish event for #operation'() { + def 'Create and Publish ncmp event where events are #scenario'() { given: 'a cm handle id and operation and responses are mocked' mockResponses('test-cm-handle-id', 'test-topic') + and: 'notifications enabled is #notificationsEnabled' + objectUnderTest.notificationsEnabled = notificationsEnabled when: 'service is called to publish ncmp event' objectUnderTest.publishNcmpEvent('test-cm-handle-id') - then: 'no exception is thrown' - noExceptionThrown() + then: 'creator is called #expectedTimesMethodCalled times' + expectedTimesMethodCalled * mockNcmpEventsCreator.populateNcmpEvent('test-cm-handle-id', _) + and: 'publisher is called #expectedTimesMethodCalled times' + expectedTimesMethodCalled * mockNcmpEventsPublisher.publishEvent(*_) + where: 'the following values are used' + scenario | notificationsEnabled|| expectedTimesMethodCalled + 'enabled' | true || 1 + 'disabled' | false || 0 } def mockResponses(cmHandleId, topicName) { @@ -50,9 +58,8 @@ class NcmpEventsServiceSpec extends Specification { def ncmpServiceCmhandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(yangModelCmHandle) mockInventoryPersistence.getYangModelCmHandle(cmHandleId) >> yangModelCmHandle - mockNcmpEventsMapper.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent + mockNcmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent mockNcmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent) >> {} } - } diff --git a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java index 2667ef490..2d8f7fb08 100644 --- a/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java +++ b/cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java @@ -1,6 +1,7 @@ /* * ============LICENSE_START======================================================= * Copyright (c) 2021 Bell Canada. + * Modifications Copyright (C) 2022 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +34,7 @@ import org.springframework.validation.annotation.Validated; @EnableAsync @Configuration -@ConditionalOnProperty(name = "notification.async.enabled", havingValue = "true", matchIfMissing = false) +@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true) @ConfigurationProperties("notification.async.executor") @Validated @Setter diff --git a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java b/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java index eb75e3f75..3776a93d9 100644 --- a/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java +++ b/cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import javax.validation.constraints.NotNull; import lombok.Data; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; @@ -36,6 +37,7 @@ public class NotificationProperties { @NotNull private String topic; private Map filters = Collections.emptyMap(); - @NotNull - private boolean enabled = false; + + @Value("${notification.enabled:true}") + private boolean enabled; } diff --git a/cps-service/src/test/resources/application.yml b/cps-service/src/test/resources/application.yml index a28b40083..04295eb74 100644 --- a/cps-service/src/test/resources/application.yml +++ b/cps-service/src/test/resources/application.yml @@ -18,13 +18,12 @@ # ============LICENSE_END========================================================= notification: + enabled: true data-updated: filters: enabled-dataspaces: ".*-published,.*-important" - enabled: true topic: cps-event async: - enabled: true executor: core-pool-size: 2 max-pool-size: 10 diff --git a/csit/plans/cps/pnfsim/docker-compose.yml b/csit/plans/cps/pnfsim/docker-compose.yml old mode 100755 new mode 100644 diff --git a/csit/plans/cps/setup.sh b/csit/plans/cps/setup.sh index 59542402f..43575f944 100755 --- a/csit/plans/cps/setup.sh +++ b/csit/plans/cps/setup.sh @@ -61,12 +61,12 @@ mkdir -p $WORKSPACE/archives/dc-cps cp $WORKSPACE/../docker-compose/*.yml $WORKSPACE/archives/dc-cps cd $WORKSPACE/archives/dc-cps -# download docker-compose of a required version (1.25.0 supports configuration of version 3.7) -curl -L https://github.com/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > docker-compose +curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > docker-compose chmod +x docker-compose +docker-compose version # start CPS/NCMP, DMI, and PostgreSQL containers with docker compose -./docker-compose up -d +docker-compose up -d ###################### setup sdnc ####################################### source $WORKSPACE/plans/cps/sdnc/sdnc_setup.sh diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml old mode 100755 new mode 100644 index 9edea3526..eafcb3c9f --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -16,74 +16,12 @@ # limitations under the License. # ============LICENSE_END========================================================= -version: "3.7" +version: '3.3' services: - ### Services cps-service, cps-ncmp, zookeeper and kafka are commented below, these - ### services can be un-commented and used on need to use basis. Only minimal - ### services will run (dbpostgresql, cps-and-ncmp and ncmp-dmi-plugin) by default. - #cps-standalone: - # container_name: cps-service - # image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-service:${VERSION:-latest} - # ports: - # - "8881:8080" - # - "8887:8081" - # environment: - # CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser} - # CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!} - # DB_HOST: dbpostgresql - # DB_USERNAME: ${DB_USERNAME:-cps} - # DB_PASSWORD: ${DB_PASSWORD:-cps} - # #KAFKA_BOOTSTRAP_SERVER: kafka:9092 - # #notification.data-updated.enabled: 'true' - # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' - # restart: unless-stopped - # depends_on: - # - dbpostgresql - - #ncmp-standalone: - # container_name: cps-ncmp - # image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-ncmp:${VERSION:-latest} - # ports: - # - "8882:8080" - # - "8887:8081" - # environment: - # CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser} - # CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!} - # DB_HOST: dbpostgresql - # DB_USERNAME: ${DB_USERNAME:-cps} - # DB_PASSWORD: ${DB_PASSWORD:-cps} - # DMI_USERNAME: ${DMI_USERNAME:-cpsuser} - # DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} - # #KAFKA_BOOTSTRAP_SERVER: kafka:9092 - # #notification.data-updated.enabled: 'true' - # #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' - # restart: unless-stopped - # depends_on: - # - dbpostgresql - - # zookeeper: - # image: confluentinc/cp-zookeeper:6.2.1 - # environment: - # ZOOKEEPER_CLIENT_PORT: 2181 - # ZOOKEEPER_TICK_TIME: 2000 - # ports: - # - 22181:2181 - # - # kafka: - # image: confluentinc/cp-kafka:6.2.1 - # depends_on: - # - zookeeper - # ports: - # - 29092:29092 - # environment: - # KAFKA_BROKER_ID: 1 - # KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - # KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - # KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - # KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - # KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + ### docker-compose up -d -> run ALL services ### + ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ### dbpostgresql: container_name: dbpostgresql @@ -110,7 +48,8 @@ services: DMI_USERNAME: ${DMI_USERNAME:-cpsuser} DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} KAFKA_BOOTSTRAP_SERVER: kafka:9092 - notification.data-updated.enabled: 'true' + notification.enabled: 'true' + notification.async.executor.time-out-value-in-ms: 2000 NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*' TIMERS_ADVISED-MODULES-SYNC_SLEEP-TIME-MS: 2000 restart: unless-stopped diff --git a/docs/deployment.rst b/docs/deployment.rst index 7dd4494f2..c0d8b6029 100644 --- a/docs/deployment.rst +++ b/docs/deployment.rst @@ -246,15 +246,12 @@ Any spring supported property can be configured by providing in ``config.additio | config.additional. | Kafka topic to publish to cps-temporal | ``cps.data-updated-events`` | | notification.data-updated.topic | | | +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+ -| config.additional. | If notification from cps-core to cps-temporal is enabled or not. | ``true`` | -| notification.data-updated.enabled | If this is set to false, then the config.publisher properties could be skipped. | | -+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+ | config.additional. | Dataspaces to be enabled for publishing events to cps-temporal | ```` | | notification.data-updated.filters. | | | | enabled-dataspaces | | | +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+ -| config.additional. | If notifications should be processed in synchronous or asynchronous manner | ``false`` | -| notification.async.enabled | | | +| config.additional. | If asynchronous messaging, user notifications, and updated event persistence should be enabled | ``true`` | +| notification.enabled | | | +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+ | config.additional. | Core pool size in asynchronous execution of notification. | ``2`` | | notification.async.executor. | | |