import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
import io.grpc.ManagedChannel
-import kotlinx.coroutines.*
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withTimeout
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
-import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcLibPropertyService
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BlueprintProcessingServiceGrpc
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
}
@Service
-@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
- havingValue = "true", matchIfMissing = false)
-class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
- : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+@ConditionalOnProperty(
+ prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false
+)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BlueprintGrpcLibPropertyService) :
+ StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
private val log = logger(StreamingRemoteExecutionServiceImpl::class)
private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
private val commChannels: MutableMap<String,
- ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
-
+ ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
/**
* Open new channel to send and receive for grpc properties [selector] for [txId],
val grpcChannel = grpcChannel(selector)
/** Get Send and Receive Channel for bidirectional process method*/
- val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+ val channels = clientCallBidiStreaming(BlueprintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
commChannels[txId] = channels
}
val commChannel = commChannels[txId]
- ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
+ ?: throw BlueprintException("failed to create response subscription for transactionId($txId) channel")
log.info("created subscription for transactionId($txId)")
*/
override suspend fun send(txId: String, input: ExecutionServiceInput) {
val sendChannel = commChannels[txId]?.requestChannel
- ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ ?: throw BlueprintException("failed to get transactionId($txId) send channel")
coroutineScope {
launch {
sendChannel.send(input)
* so the correlation is sub request id to receive the response.
*/
@ExperimentalCoroutinesApi
- override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
- : ExecutionServiceOutput {
-
- var output: ExecutionServiceOutput? = null
- val flow = openSubscription(selector, txId)
-
- /** Send the request */
- val sendChannel = commChannels[txId]?.requestChannel
- ?: throw BluePrintException("failed to get transactionId($txId) send channel")
- sendChannel.send(input)
-
- /** Receive the response with timeout */
- withTimeout(timeOutMill) {
- flow.collect {
- log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
- if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
- output = it
- cancelSubscription(txId)
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
+ ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BlueprintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
}
}
+ return output!!
}
- return output!!
- }
/** Cancel the Subscription for the [txId], This closes communication channel **/
@ExperimentalCoroutinesApi
if (!it.requestChannel.isClosedForSend)
it.requestChannel.close()
/** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
- //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ // it.responseChannel.cancel(CancellationException("subscription cancelled"))
commChannels.remove(txId)
log.info("closed subscription for transactionId($txId)")
}
}
suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
- val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
- .blueprintGrpcClientService(grpcProperties)
+ val grpcClientService: BlueprintGrpcClientService = bluePrintGrpcLibPropertyService
+ .blueprintGrpcClientService(grpcProperties)
return grpcClientService.channel()
}
selector
}
else -> {
- throw BluePrintException("couldn't process selector($selector)")
+ throw BlueprintException("couldn't process selector($selector)")
}
}
}