Bi-directional GRPC non interactive implementation 43/96643/1
authorBrinda Santh <bs2796@att.com>
Fri, 4 Oct 2019 21:24:16 +0000 (17:24 -0400)
committerBrinda Santh <bs2796@att.com>
Fri, 4 Oct 2019 21:24:16 +0000 (17:24 -0400)
Issue-ID: CCSDK-1747
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I236bf6d4edaf983ca4162b5ce064736ac690b504

ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml

index 0784816..adb1d67 100644 (file)
@@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode
 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
 import io.grpc.ManagedChannel
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.*
 import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.launch
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
@@ -39,11 +38,13 @@ import org.springframework.stereotype.Service
 
 interface StreamingRemoteExecutionService<ReqT, ResT> {
 
-    suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+    suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
 
-    suspend fun send(input: ReqT)
+    suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
 
-    suspend fun cancelSubscription(requestId: String)
+    suspend fun send(txId: String, input: ReqT)
+
+    suspend fun cancelSubscription(txId: String)
 
     suspend fun closeChannel(selector: Any)
 }
@@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
 
 
     /**
-     * Open new channel to send and receive for grpc properties [selector] for [requestId],
+     * Open new channel to send and receive for grpc properties [selector] for [txId],
      * Create the only one GRPC channel per host port and reuse for further communication.
      * Create request communication channel to send and receive requests and responses.
-     * We can send multiple request with same requestId with unique subRequestId.
+     * We can send multiple request with same txId.
      * Consume the flow for responses,
      * Client should cancel the subscription for the request Id once no longer response is needed.
      * */
     @FlowPreview
-    override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+    override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
 
-        if (!commChannels.containsKey(requestId)) {
+        if (!commChannels.containsKey(txId)) {
             /** Get GRPC Channel*/
             val grpcChannel = grpcChannel(selector)
 
             /** Get Send and Receive Channel for bidirectional process method*/
             val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
-            commChannels[requestId] = channels
+            commChannels[txId] = channels
         }
 
-        val commChannel = commChannels[requestId]
-                ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+        val commChannel = commChannels[txId]
+                ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
 
-        log.info("created subscription for request($requestId)")
+        log.info("created subscription for transactionId($txId)")
 
         return commChannel.responseChannel.consumeAsFlow()
     }
 
     /**
-     * Send the [input]request, by reusing same GRPC channel and Communication channel
+     * Send the [input] request, by reusing same GRPC channel and Communication channel
      * for the request Id.
      */
-    override suspend fun send(input: ExecutionServiceInput) {
-        val requestId = input.commonHeader.requestId
-        val subRequestId = input.commonHeader.subRequestId
-        val sendChannel = commChannels[requestId]?.requestChannel
-                ?: throw BluePrintException("failed to get request($requestId) send channel")
+    override suspend fun send(txId: String, input: ExecutionServiceInput) {
+        val sendChannel = commChannels[txId]?.requestChannel
+                ?: throw BluePrintException("failed to get transactionId($txId) send channel")
         coroutineScope {
             launch {
                 sendChannel.send(input)
-                log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+                log.trace("Message sent for transactionId($txId)")
+            }
+        }
+    }
+
+    /**
+     * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+     * for the GRPC [selector] with execution [timeOutMill].
+     * Other state of the request will be skipped.
+     * The assumption here is you can call this API with same request Id and unique subrequest Id,
+     * so the correlation is sub request id to receive the response.
+     */
+    @ExperimentalCoroutinesApi
+    override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+            : ExecutionServiceOutput {
+
+        var output: ExecutionServiceOutput? = null
+        val flow = openSubscription(selector, txId)
+
+        /** Send the request */
+        val sendChannel = commChannels[txId]?.requestChannel
+                ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+        sendChannel.send(input)
+
+        /** Receive the response with timeout */
+        withTimeout(timeOutMill) {
+            flow.collect {
+                log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+                if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+                    output = it
+                    cancelSubscription(txId)
+                }
             }
         }
+        return output!!
     }
 
-    /** Cancel the Subscription for the [requestId], This closes communication channel **/
+    /** Cancel the Subscription for the [txId], This closes communication channel **/
     @ExperimentalCoroutinesApi
-    override suspend fun cancelSubscription(requestId: String) {
-        commChannels[requestId]?.let {
+    override suspend fun cancelSubscription(txId: String) {
+        commChannels[txId]?.let {
             if (!it.requestChannel.isClosedForSend)
                 it.requestChannel.close()
             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
             //it.responseChannel.cancel(CancellationException("subscription cancelled"))
-            commChannels.remove(requestId)
-            log.info("closed subscription for request($requestId)")
+            commChannels.remove(txId)
+            log.info("closed subscription for transactionId($txId)")
         }
     }
 
index c9ff235..29d24c6 100644 (file)
@@ -36,6 +36,8 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
 import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
 
 
 class StreamingRemoteExecutionServiceTest {
@@ -69,25 +71,48 @@ class StreamingRemoteExecutionServiceTest {
 
             val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
             /** To test with real server, uncomment below line */
-            coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+            coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+            /** Test Send and Receive non interactive transaction */
+            val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
+            repeat(2) { count ->
+                val requestId = "1234-$count"
+                val request = getRequest(requestId)
+                val invocationId = request.commonHeader.subRequestId
+                val deferred = async {
+                    val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
+                            invocationId, request, 1000L)
+                    assertNotNull(response, "failed to get non interactive response")
+                    assertEquals(response.commonHeader.requestId, requestId,
+                            "failed to match non interactive response id")
+                    assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
+                            "failed to match non interactive response type")
+                }
+                nonInteractiveDeferred.add(deferred)
 
-            val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+            }
+            nonInteractiveDeferred.awaitAll()
 
-            repeat(1) { count ->
+            /** Test Send and Receive interactive transaction */
+            val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+            repeat(2) { count ->
                 val requestId = "12345-$count"
-                val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
+                val request = getRequest(requestId)
+                val invocationId = request.commonHeader.requestId
+                val responseFlow = spyStreamingRemoteExecutionService
+                        .openSubscription(tokenAuthGrpcClientProperties, invocationId)
 
                 val deferred = async {
                     responseFlow.collect {
-                        log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
+                        log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
                         if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
-                            spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
+                            spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
                         }
                     }
                 }
                 responseFlowsDeferred.add(deferred)
                 /** Sending Multiple messages with same requestId  and different subRequestId */
-                spyStreamingRemoteExecutionService.send(getRequest(requestId))
+                spyStreamingRemoteExecutionService.send(invocationId, request)
             }
             responseFlowsDeferred.awaitAll()
             streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
index 703a526..afe10b3 100644 (file)
@@ -19,7 +19,7 @@
         <!-- encoders are assigned the type
              ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
         <encoder>
-            <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
         </encoder>
     </appender>