Kafka consumer can not be turned off
[cps.git] / cps-ncmp-rest / src / main / java / org / onap / cps / ncmp / rest / controller / NetworkCmProxyController.java
index fb234ef..5703d5e 100755 (executable)
@@ -28,7 +28,6 @@ import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum
 import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.PATCH;
 import static org.onap.cps.ncmp.api.impl.operations.DmiRequestBody.OperationEnum.UPDATE;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -75,9 +74,10 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
     private final NcmpRestInputMapper ncmpRestInputMapper;
     private final RestOutputCmHandleStateMapper restOutputCmHandleStateMapper;
     private final CpsNcmpTaskExecutor cpsNcmpTaskExecutor;
-
     @Value("${notification.async.executor.time-out-value-in-ms:2000}")
     private int timeOutInMilliSeconds;
+    @Value("${notification.enabled:true}")
+    private boolean asyncEnabled;
 
     /**
      * Get resource data from operational datastore.
@@ -93,15 +93,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
                                                                         final @NotNull @Valid String resourceIdentifier,
                                                                         final @Valid String optionsParamInQuery,
                                                                         final @Valid String topicParamInQuery) {
-        if (isValidTopic(topicParamInQuery)) {
+        if (asyncEnabled && isValidTopic(topicParamInQuery)) {
             final String requestId = UUID.randomUUID().toString();
+            log.info("Received Async passthrough-operational request with id {}", requestId);
             cpsNcmpTaskExecutor.executeTask(() ->
-                networkCmProxyDataService.getResourceDataOperationalForCmHandle(
-                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
-                        requestId
-                ), timeOutInMilliSeconds
+                    networkCmProxyDataService.getResourceDataOperationalForCmHandle(
+                        cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
+                    ), timeOutInMilliSeconds
             );
-            return acknowledgeAsyncRequest(requestId);
+            return ResponseEntity.ok(Map.of("requestId", requestId));
+        } else {
+            log.warn("Asynchronous messaging is currently disabled for passthrough-operational."
+                + " Will use synchronous operation.");
         }
 
         final Object responseObject = networkCmProxyDataService.getResourceDataOperationalForCmHandle(
@@ -124,15 +127,18 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
                                                                     final @NotNull @Valid String resourceIdentifier,
                                                                     final @Valid String optionsParamInQuery,
                                                                     final @Valid String topicParamInQuery) {
-        if (isValidTopic(topicParamInQuery)) {
-            final String resourceDataRequestId = UUID.randomUUID().toString();
+        if (asyncEnabled && isValidTopic(topicParamInQuery)) {
+            final String requestId = UUID.randomUUID().toString();
+            log.info("Received Async passthrough-running request with id {}", requestId);
             cpsNcmpTaskExecutor.executeTask(() ->
                 networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
-                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery,
-                        resourceDataRequestId
+                    cmHandle, resourceIdentifier, optionsParamInQuery, topicParamInQuery, requestId
                 ), timeOutInMilliSeconds
             );
-            return acknowledgeAsyncRequest(resourceDataRequestId);
+            return ResponseEntity.ok(Map.of("requestId", requestId));
+        } else {
+            log.warn("Asynchronous messaging is currently disabled for passthrough-running."
+                + " Will use synchronous operation.");
         }
 
         final Object responseObject = networkCmProxyDataService.getResourceDataPassThroughRunningForCmHandle(
@@ -301,11 +307,5 @@ public class NetworkCmProxyController implements NetworkCmProxyApi {
         throw new InvalidTopicException("Topic name " + topicName + " is invalid", "invalid topic");
     }
 
-    private ResponseEntity<Object> acknowledgeAsyncRequest(final String requestId) {
-        final Map<String, Object> acknowledgeData = new HashMap<>(1);
-        acknowledgeData.put("requestId", requestId);
-        return ResponseEntity.ok(acknowledgeData);
-    }
-
 }