Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / execution-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / execution / StreamingRemoteExecutionService.kt
index adb1d67..dadf72e 100644 (file)
@@ -20,17 +20,21 @@ import com.fasterxml.jackson.databind.JsonNode
 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
 import io.grpc.ManagedChannel
-import kotlinx.coroutines.*
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.coroutineScope
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withTimeout
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcLibPropertyService
 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BlueprintProcessingServiceGrpc
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
@@ -50,18 +54,19 @@ interface StreamingRemoteExecutionService<ReqT, ResT> {
 }
 
 @Service
-@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
-        havingValue = "true", matchIfMissing = false)
-class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
-    : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+@ConditionalOnProperty(
+    prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+    havingValue = "true", matchIfMissing = false
+)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BlueprintGrpcLibPropertyService) :
+    StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
 
     private val log = logger(StreamingRemoteExecutionServiceImpl::class)
 
     private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
 
     private val commChannels: MutableMap<String,
-            ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
-
+        ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
 
     /**
      * Open new channel to send and receive for grpc properties [selector] for [txId],
@@ -79,12 +84,12 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
             val grpcChannel = grpcChannel(selector)
 
             /** Get Send and Receive Channel for bidirectional process method*/
-            val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+            val channels = clientCallBidiStreaming(BlueprintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
             commChannels[txId] = channels
         }
 
         val commChannel = commChannels[txId]
-                ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
+            ?: throw BlueprintException("failed to create response subscription for transactionId($txId) channel")
 
         log.info("created subscription for transactionId($txId)")
 
@@ -97,7 +102,7 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
      */
     override suspend fun send(txId: String, input: ExecutionServiceInput) {
         val sendChannel = commChannels[txId]?.requestChannel
-                ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+            ?: throw BlueprintException("failed to get transactionId($txId) send channel")
         coroutineScope {
             launch {
                 sendChannel.send(input)
@@ -114,29 +119,29 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
      * so the correlation is sub request id to receive the response.
      */
     @ExperimentalCoroutinesApi
-    override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
-            : ExecutionServiceOutput {
-
-        var output: ExecutionServiceOutput? = null
-        val flow = openSubscription(selector, txId)
-
-        /** Send the request */
-        val sendChannel = commChannels[txId]?.requestChannel
-                ?: throw BluePrintException("failed to get transactionId($txId) send channel")
-        sendChannel.send(input)
-
-        /** Receive the response with timeout */
-        withTimeout(timeOutMill) {
-            flow.collect {
-                log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
-                if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
-                    output = it
-                    cancelSubscription(txId)
+    override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
+        ExecutionServiceOutput {
+
+            var output: ExecutionServiceOutput? = null
+            val flow = openSubscription(selector, txId)
+
+            /** Send the request */
+            val sendChannel = commChannels[txId]?.requestChannel
+                ?: throw BlueprintException("failed to get transactionId($txId) send channel")
+            sendChannel.send(input)
+
+            /** Receive the response with timeout */
+            withTimeout(timeOutMill) {
+                flow.collect {
+                    log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+                    if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+                        output = it
+                        cancelSubscription(txId)
+                    }
                 }
             }
+            return output!!
         }
-        return output!!
-    }
 
     /** Cancel the Subscription for the [txId], This closes communication channel **/
     @ExperimentalCoroutinesApi
@@ -145,7 +150,7 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
             if (!it.requestChannel.isClosedForSend)
                 it.requestChannel.close()
             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
-            //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+            // it.responseChannel.cancel(CancellationException("subscription cancelled"))
             commChannels.remove(txId)
             log.info("closed subscription for transactionId($txId)")
         }
@@ -181,8 +186,8 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
     }
 
     suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
-        val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
-                .blueprintGrpcClientService(grpcProperties)
+        val grpcClientService: BlueprintGrpcClientService = bluePrintGrpcLibPropertyService
+            .blueprintGrpcClientService(grpcProperties)
         return grpcClientService.channel()
     }
 
@@ -198,7 +203,7 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
                 selector
             }
             else -> {
-                throw BluePrintException("couldn't process selector($selector)")
+                throw BlueprintException("couldn't process selector($selector)")
             }
         }
     }