import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
import io.grpc.ManagedChannel
-import kotlinx.coroutines.*
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withTimeout
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
}
@Service
-@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
- havingValue = "true", matchIfMissing = false)
-class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
- : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+@ConditionalOnProperty(
+ prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false
+)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) :
+ StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
private val log = logger(StreamingRemoteExecutionServiceImpl::class)
private val commChannels: MutableMap<String,
ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
-
/**
* Open new channel to send and receive for grpc properties [selector] for [txId],
* Create the only one GRPC channel per host port and reuse for further communication.
}
val commChannel = commChannels[txId]
- ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
log.info("created subscription for transactionId($txId)")
*/
override suspend fun send(txId: String, input: ExecutionServiceInput) {
val sendChannel = commChannels[txId]?.requestChannel
- ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
coroutineScope {
launch {
sendChannel.send(input)
* so the correlation is sub request id to receive the response.
*/
@ExperimentalCoroutinesApi
- override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
- : ExecutionServiceOutput {
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
+ ExecutionServiceOutput {
var output: ExecutionServiceOutput? = null
val flow = openSubscription(selector, txId)
/** Send the request */
val sendChannel = commChannels[txId]?.requestChannel
- ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
sendChannel.send(input)
/** Receive the response with timeout */
if (!it.requestChannel.isClosedForSend)
it.requestChannel.close()
/** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
- //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ // it.responseChannel.cancel(CancellationException("subscription cancelled"))
commChannels.remove(txId)
log.info("closed subscription for transactionId($txId)")
}
suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
- .blueprintGrpcClientService(grpcProperties)
+ .blueprintGrpcClientService(grpcProperties)
return grpcClientService.channel()
}