2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.ExperimentalCoroutinesApi
24 import kotlinx.coroutines.FlowPreview
25 import kotlinx.coroutines.coroutineScope
26 import kotlinx.coroutines.flow.Flow
27 import kotlinx.coroutines.flow.collect
28 import kotlinx.coroutines.flow.consumeAsFlow
29 import kotlinx.coroutines.launch
30 import kotlinx.coroutines.withTimeout
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
35 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
40 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
41 import org.springframework.stereotype.Service
43 interface StreamingRemoteExecutionService<ReqT, ResT> {
45 suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
47 suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
49 suspend fun send(txId: String, input: ReqT)
51 suspend fun cancelSubscription(txId: String)
53 suspend fun closeChannel(selector: Any)
57 @ConditionalOnProperty(
58 prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
59 havingValue = "true", matchIfMissing = false
61 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) :
62 StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
64 private val log = logger(StreamingRemoteExecutionServiceImpl::class)
66 private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
68 private val commChannels: MutableMap<String,
69 ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
72 * Open new channel to send and receive for grpc properties [selector] for [txId],
73 * Create the only one GRPC channel per host port and reuse for further communication.
74 * Create request communication channel to send and receive requests and responses.
75 * We can send multiple request with same txId.
76 * Consume the flow for responses,
77 * Client should cancel the subscription for the request Id once no longer response is needed.
80 override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
82 if (!commChannels.containsKey(txId)) {
83 /** Get GRPC Channel*/
84 val grpcChannel = grpcChannel(selector)
86 /** Get Send and Receive Channel for bidirectional process method*/
87 val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
88 commChannels[txId] = channels
91 val commChannel = commChannels[txId]
92 ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
94 log.info("created subscription for transactionId($txId)")
96 return commChannel.responseChannel.consumeAsFlow()
100 * Send the [input] request, by reusing same GRPC channel and Communication channel
101 * for the request Id.
103 override suspend fun send(txId: String, input: ExecutionServiceInput) {
104 val sendChannel = commChannels[txId]?.requestChannel
105 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
108 sendChannel.send(input)
109 log.trace("Message sent for transactionId($txId)")
115 * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
116 * for the GRPC [selector] with execution [timeOutMill].
117 * Other state of the request will be skipped.
118 * The assumption here is you can call this API with same request Id and unique subrequest Id,
119 * so the correlation is sub request id to receive the response.
121 @ExperimentalCoroutinesApi
122 override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
123 ExecutionServiceOutput {
125 var output: ExecutionServiceOutput? = null
126 val flow = openSubscription(selector, txId)
128 /** Send the request */
129 val sendChannel = commChannels[txId]?.requestChannel
130 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
131 sendChannel.send(input)
133 /** Receive the response with timeout */
134 withTimeout(timeOutMill) {
136 log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
137 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
139 cancelSubscription(txId)
146 /** Cancel the Subscription for the [txId], This closes communication channel **/
147 @ExperimentalCoroutinesApi
148 override suspend fun cancelSubscription(txId: String) {
149 commChannels[txId]?.let {
150 if (!it.requestChannel.isClosedForSend)
151 it.requestChannel.close()
152 /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
153 // it.responseChannel.cancel(CancellationException("subscription cancelled"))
154 commChannels.remove(txId)
155 log.info("closed subscription for transactionId($txId)")
159 /** Close the GRPC channel for the host port poperties [selector]*/
160 override suspend fun closeChannel(selector: Any) {
161 val grpcProperties = grpcProperties(selector)
162 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
163 if (grpcChannels.containsKey(selectorName)) {
164 grpcChannels[selectorName]!!.shutdownNow()
165 grpcChannels.remove(selectorName)
166 log.info("grpc channel($selectorName) shutdown completed")
170 /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
171 private suspend fun grpcChannel(selector: Any): ManagedChannel {
172 val grpcProperties = grpcProperties(selector)
173 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
174 val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
175 val grpcChannel = if (isGrpcChannelCached) {
176 if (grpcChannels[selectorName]!!.isShutdown) {
177 createGrpcChannel(grpcProperties)
179 grpcChannels[selectorName]!!
182 createGrpcChannel(grpcProperties)
184 grpcChannels[selectorName] = grpcChannel
188 suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
189 val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
190 .blueprintGrpcClientService(grpcProperties)
191 return grpcClientService.channel()
194 private fun grpcProperties(selector: Any): GrpcClientProperties {
195 return when (selector) {
197 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
200 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
202 is GrpcClientProperties -> {
206 throw BluePrintException("couldn't process selector($selector)")