Kafka consumer can not be turned off 36/129636/36
authorJosephKeenan <joseph.keenan@est.tech>
Thu, 16 Jun 2022 15:19:09 +0000 (16:19 +0100)
committerToine Siebelink <toine.siebelink@est.tech>
Thu, 30 Jun 2022 07:44:46 +0000 (07:44 +0000)
-NOTE: Build will fail until docker-compose version issues on build
server are fixed
  --Ticket raised https://jira.linuxfoundation.org/plugins/servlet/theme/portal/2/IT-24219
-added flag for async
-added response if async is triggered without being enabled & associated test
-modified to use one global flag for notifications

Issue-ID: CPS-1088
Signed-off-by: JosephKeenan <joseph.keenan@est.tech>
Change-Id: If9d988b4dcb71bf37c1b1bf9464090782708ffc2

13 files changed:
cps-application/src/main/resources/application.yml
cps-ncmp-rest/src/main/java/org/onap/cps/ncmp/rest/controller/NetworkCmProxyController.java
cps-ncmp-rest/src/test/resources/application.yml
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/async/NcmpAsyncRequestResponseEventConsumer.java
cps-ncmp-service/src/main/java/org/onap/cps/ncmp/api/impl/event/NcmpEventsService.java
cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/api/impl/event/NcmpEventsServiceSpec.groovy
cps-service/src/main/java/org/onap/cps/config/AsyncConfig.java
cps-service/src/main/java/org/onap/cps/notification/NotificationProperties.java
cps-service/src/test/resources/application.yml
csit/plans/cps/pnfsim/docker-compose.yml [changed mode: 0755->0644]
csit/plans/cps/setup.sh
docker-compose/docker-compose.yml [changed mode: 0755->0644]
docs/deployment.rst

index 802a18b..14abebb 100644 (file)
@@ -88,14 +88,14 @@ app:
             topic: ${NCMP_ASYNC_M2M_TOPIC:ncmp-async-m2m}\r
         events:\r
             topic: ${NCMP_EVENTS_TOPIC:ncmp-events}\r
+\r
 notification:\r
+    enabled: true\r
     data-updated:\r
-        enabled: false\r
         topic: ${CPS_CHANGE_EVENT_TOPIC:cps.data-updated-events}\r
         filters:\r
             enabled-dataspaces: ${NOTIFICATION_DATASPACE_FILTER_PATTERNS:""}\r
     async:\r
-        enabled: false\r
         executor:\r
             core-pool-size: 2\r
             max-pool-size: 10\r
index fb234ef..5703d5e 100755 (executable)
@@ -28,7 +28,6 @@ import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum
 import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.PATCH;
 import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.UPDATE;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,9 +74,10 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
     private final NcmpRestInputMapper ncmpRestInputMapper;
     private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper;
     private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
-
     @Value("${notification.async.executor.time-out-value-in-ms:2000}")
     private int timeOutInMilliSeconds;
+    @Value("${notification.enabled:true}")
+    private boolean asyncEnabled;
 
     /**
      * Get resource data from operational datastore.
@@ -93,15 +93,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
                                                                         final @NotNull @Valid String resourceIdentifier,
                                                                         final @Valid String optionsParamInQuery,
                                                                         final @Valid String topicParamInQuery) {
-        if (isValidTopic(topicParamInQuery)) {
+        if (asyncEnabled && isValidTopic(topicParamInQuery)) {
             final String requestId = UUID.randomUUID().toString();
+            log.info("Received Async passthrough-operational request with id {}", requestId);
             cpsNcmpTaskExecutor.executeTask(() ->
-                networkCmProxyDataService.getResourceDataOperationalForCmHandle(
-                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
-                        requestId
-                ), timeOutInMilliSeconds
+                    networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+                        cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
+                    ), timeOutInMilliSeconds
             );
-            return acknowledgeAsyncRequest(requestId);
+            return ResponseEntity.ok(Map.of("requestId", requestId));
+        } else {
+            log.warn("Asynchronous messaging is currently disabled for passthrough-operational."
+                + " Will use synchronous operation.");
         }
 
         final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(
@@ -124,15 +127,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
                                                                     final @NotNull @Valid String resourceIdentifier,
                                                                     final @Valid String optionsParamInQuery,
                                                                     final @Valid String topicParamInQuery) {
-        if (isValidTopic(topicParamInQuery)) {
-            final String resourceDataRequestId = UUID.randomUUID().toString();
+        if (asyncEnabled && isValidTopic(topicParamInQuery)) {
+            final String requestId = UUID.randomUUID().toString();
+            log.info("Received Async passthrough-running request with id {}", requestId);
             cpsNcmpTaskExecutor.executeTask(() ->
                 networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
-                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
-                        resourceDataRequestId
+                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
                 ), timeOutInMilliSeconds
             );
-            return acknowledgeAsyncRequest(resourceDataRequestId);
+            return ResponseEntity.ok(Map.of("requestId", requestId));
+        } else {
+            log.warn("Asynchronous messaging is currently disabled for passthrough-running."
+                + " Will use synchronous operation.");
         }
 
         final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
@@ -301,11 +307,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
         throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic");
     }
 
-    private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) {
-        final Map<String, Object> acknowledgeData = new HashMap<>(1);
-        acknowledgeData.put("requestId", requestId);
-        return ResponseEntity.ok(acknowledgeData);
-    }
-
 }
 
index 0241696..9df1e58 100644 (file)
@@ -1,5 +1,5 @@
 #  ============LICENSE_START=======================================================
-#  Copyright (C) 2021 Nordix Foundation
+#  Copyright (C) 2021-2022 Nordix Foundation
 #  Modifications Copyright (C) 2021 Bell Canada.
 #  ================================================================================
 #  Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,6 +23,7 @@ rest:
         ncmp-inventory-base-path: /ncmpInventory
 
 notification:
+    enabled: true
     async:
         executor:
             time-out-value-in-ms: 2000
\ No newline at end of file
index 4e5c57b..a9e7164 100644 (file)
@@ -24,6 +24,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.onap.cps.ncmp.event.model.DmiAsyncRequestResponseEvent;
 import org.onap.cps.ncmp.event.model.NcmpAsyncRequestResponseEvent;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
@@ -33,6 +34,7 @@ import org.springframework.stereotype.Component;
 @Component
 @Slf4j
 @RequiredArgsConstructor
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 public class NcmpAsyncRequestResponseEventConsumer {
 
     private final NcmpAsyncRequestResponseEventProducer ncmpAsyncRequestResponseEventProducer;
index 6804ac0..7b5ceb5 100644 (file)
@@ -47,17 +47,23 @@ public class NcmpEventsService {
     @Value("${app.ncmp.events.topic:ncmp-events}")
     private String topicName;
 
+    @Value("${notification.enabled:true}")
+    private boolean notificationsEnabled;
+
     /**
      * Publish the NcmpEvent to the public topic.
      *
      * @param cmHandleId Cm Handle Id
      */
     public void publishNcmpEvent(final String cmHandleId) {
-
-        final NcmpServiceCmHandle ncmpServiceCmHandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(
-                inventoryPersistence.getYangModelCmHandle(cmHandleId));
-        final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle);
-        ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent);
-
+        if (notificationsEnabled) {
+            final NcmpServiceCmHandle ncmpServiceCmHandle =
+                YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(
+                    inventoryPersistence.getYangModelCmHandle(cmHandleId));
+            final NcmpEvent ncmpEvent = ncmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmHandle);
+            ncmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent);
+        } else {
+            log.debug("Notifications disabled.");
+        }
     }
 }
index e265fef..52806a8 100644 (file)
@@ -30,17 +30,25 @@ class NcmpEventsServiceSpec extends Specification {
 
     def mockInventoryPersistence = Mock(InventoryPersistence)
     def mockNcmpEventsPublisher = Mock(NcmpEventsPublisher)
-    def mockNcmpEventsMapper = Mock(NcmpEventsCreator)
+    def mockNcmpEventsCreator = Mock(NcmpEventsCreator)
 
-    def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsMapper)
+    def objectUnderTest = new NcmpEventsService(mockInventoryPersistence, mockNcmpEventsPublisher, mockNcmpEventsCreator)
 
-    def 'Create and Publish event for #operation'() {
+    def 'Create and Publish ncmp event where events are #scenario'() {
         given: 'a cm handle id and operation and responses are mocked'
             mockResponses('test-cm-handle-id', 'test-topic')
+        and: 'notifications enabled is #notificationsEnabled'
+            objectUnderTest.notificationsEnabled = notificationsEnabled
         when: 'service is called to publish ncmp event'
             objectUnderTest.publishNcmpEvent('test-cm-handle-id')
-        then: 'no exception is thrown'
-            noExceptionThrown()
+        then: 'creator is called #expectedTimesMethodCalled times'
+            expectedTimesMethodCalled * mockNcmpEventsCreator.populateNcmpEvent('test-cm-handle-id', _)
+        and: 'publisher is called #expectedTimesMethodCalled times'
+            expectedTimesMethodCalled * mockNcmpEventsPublisher.publishEvent(*_)
+        where: 'the following values are used'
+            scenario   | notificationsEnabled|| expectedTimesMethodCalled
+            'enabled'  | true                || 1
+            'disabled' | false               || 0
     }
 
     def mockResponses(cmHandleId, topicName) {
@@ -50,9 +58,8 @@ class NcmpEventsServiceSpec extends Specification {
         def ncmpServiceCmhandle = YangDataConverter.convertYangModelCmHandleToNcmpServiceCmHandle(yangModelCmHandle)
 
         mockInventoryPersistence.getYangModelCmHandle(cmHandleId) >> yangModelCmHandle
-        mockNcmpEventsMapper.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent
+        mockNcmpEventsCreator.populateNcmpEvent(cmHandleId, ncmpServiceCmhandle) >> ncmpEvent
         mockNcmpEventsPublisher.publishEvent(topicName, cmHandleId, ncmpEvent) >> {}
     }
 
-
 }
index 2667ef4..2d8f7fb 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * ============LICENSE_START=======================================================
  * Copyright (c) 2021 Bell Canada.
+ * Modifications Copyright (C) 2022 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,7 +34,7 @@ import org.springframework.validation.annotation.Validated;
 
 @EnableAsync
 @Configuration
-@ConditionalOnProperty(name = "notification.async.enabled", havingValue = "true", matchIfMissing = false)
+@ConditionalOnProperty(name = "notification.enabled", havingValue = "true", matchIfMissing = true)
 @ConfigurationProperties("notification.async.executor")
 @Validated
 @Setter
index eb75e3f..3776a93 100644 (file)
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import javax.validation.constraints.NotNull;
 import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 import org.springframework.validation.annotation.Validated;
@@ -36,6 +37,7 @@ public class NotificationProperties {
     @NotNull
     private String topic;
     private Map<String, String> filters = Collections.emptyMap();
-    @NotNull
-    private boolean enabled = false;
+
+    @Value("${notification.enabled:true}")
+    private boolean enabled;
 }
index a28b400..04295eb 100644 (file)
 #  ============LICENSE_END=========================================================
 
 notification:
+  enabled: true
   data-updated:
     filters:
       enabled-dataspaces: ".*-published,.*-important"
-    enabled: true
     topic: cps-event
   async:
-    enabled: true
     executor:
       core-pool-size: 2
       max-pool-size: 10
old mode 100755 (executable)
new mode 100644 (file)
index 5954240..43575f9 100755 (executable)
@@ -61,12 +61,12 @@ mkdir -p $WORKSPACE/archives/dc-cps
 cp $WORKSPACE/../docker-compose/*.yml $WORKSPACE/archives/dc-cps
 cd $WORKSPACE/archives/dc-cps
 
-# download docker-compose of a required version (1.25.0 supports configuration of version 3.7)
-curl -L https://github.com/docker/compose/releases/download/1.25.0/docker-compose-`uname -s`-`uname -m` > docker-compose
+curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` > docker-compose
 chmod +x docker-compose
+docker-compose version
 
 # start CPS/NCMP, DMI, and PostgreSQL containers with docker compose
-./docker-compose up -d
+docker-compose up -d
 
 ###################### setup sdnc #######################################
 source $WORKSPACE/plans/cps/sdnc/sdnc_setup.sh
old mode 100755 (executable)
new mode 100644 (file)
index 9edea35..eafcb3c
 # limitations under the License.
 # ============LICENSE_END=========================================================
 
-version: "3.7"
+version: '3.3'
 
 services:
-  ### Services cps-service, cps-ncmp, zookeeper and kafka are commented below, these
-  ### services can be un-commented and used on need to use basis. Only minimal
-  ### services will run (dbpostgresql, cps-and-ncmp and ncmp-dmi-plugin) by default.
 
-  #cps-standalone:
-  #  container_name: cps-service
-  #  image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-service:${VERSION:-latest}
-  #  ports:
-  #    - "8881:8080"
-  #    - "8887:8081"
-  #  environment:
-  #    CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
-  #    CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
-  #    DB_HOST: dbpostgresql
-  #    DB_USERNAME: ${DB_USERNAME:-cps}
-  #    DB_PASSWORD: ${DB_PASSWORD:-cps}
-  #    #KAFKA_BOOTSTRAP_SERVER: kafka:9092
-  #    #notification.data-updated.enabled: 'true'
-  #    #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
-  #  restart: unless-stopped
-  #  depends_on:
-  #    - dbpostgresql
-
-  #ncmp-standalone:
-  #  container_name: cps-ncmp
-  #  image: ${DOCKER_REPO:-nexus3.onap.org:10003}/onap/cps-ncmp:${VERSION:-latest}
-  #  ports:
-  #    - "8882:8080"
-  #    - "8887:8081"
-  #  environment:
-  #    CPS_USERNAME: ${CPS_CORE_USERNAME:-cpsuser}
-  #    CPS_PASSWORD: ${CPS_CORE_PASSWORD:-cpsr0cks!}
-  #    DB_HOST: dbpostgresql
-  #    DB_USERNAME: ${DB_USERNAME:-cps}
-  #    DB_PASSWORD: ${DB_PASSWORD:-cps}
-  #    DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
-  #    DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
-  #    #KAFKA_BOOTSTRAP_SERVER: kafka:9092
-  #    #notification.data-updated.enabled: 'true'
-  #    #NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
-  #  restart: unless-stopped
-  #  depends_on:
-  #    - dbpostgresql
-
-  #  zookeeper:
-  #    image: confluentinc/cp-zookeeper:6.2.1
-  #    environment:
-  #      ZOOKEEPER_CLIENT_PORT: 2181
-  #      ZOOKEEPER_TICK_TIME: 2000
-  #    ports:
-  #      - 22181:2181
-  #
-  #  kafka:
-  #    image: confluentinc/cp-kafka:6.2.1
-  #    depends_on:
-  #      - zookeeper
-  #    ports:
-  #      - 29092:29092
-  #    environment:
-  #      KAFKA_BROKER_ID: 1
-  #      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-  #      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
-  #      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
-  #      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
-  #      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+  ### docker-compose up -d -> run ALL services ###
+  ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ###
 
   dbpostgresql:
     container_name: dbpostgresql
@@ -110,7 +48,8 @@ services:
       DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
       DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
       KAFKA_BOOTSTRAP_SERVER: kafka:9092
-      notification.data-updated.enabled: 'true'
+      notification.enabled: 'true'
+      notification.async.executor.time-out-value-in-ms: 2000
       NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
       TIMERS_ADVISED-MODULES-SYNC_SLEEP-TIME-MS: 2000
     restart: unless-stopped
index 7dd4494..c0d8b60 100644 (file)
@@ -246,15 +246,12 @@ Any spring supported property can be configured by providing in ``config.additio
 | config.additional.                    | Kafka topic to publish to cps-temporal                                                                  | ``cps.data-updated-events``   |
 | notification.data-updated.topic       |                                                                                                         |                               |
 +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
-| config.additional.                    | If notification from cps-core to cps-temporal is enabled or not.                                        | ``true``                      |
-| notification.data-updated.enabled     | If this is set to false, then the config.publisher properties could be skipped.                         |                               |
-+---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
 | config.additional.                    | Dataspaces to be enabled for publishing events to cps-temporal                                          | ````                          |
 | notification.data-updated.filters.    |                                                                                                         |                               |
 | enabled-dataspaces                    |                                                                                                         |                               |
 +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
-| config.additional.                    | If notifications should be processed in synchronous or asynchronous manner                              | ``false``                     |
-| notification.async.enabled            |                                                                                                         |                               |
+| config.additional.                    | If asynchronous messaging, user notifications, and updated event persistence should be enabled          | ``true``                     |
+| notification.enabled                  |                                                                                                         |                               |
 +---------------------------------------+---------------------------------------------------------------------------------------------------------+-------------------------------+
 | config.additional.                    | Core pool size in asynchronous execution of notification.                                               | ``2``                         |
 | notification.async.executor.          |                                                                                                         |                               |