2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.*
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.collect
26 import kotlinx.coroutines.flow.consumeAsFlow
27 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
30 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
31 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
32 import org.onap.ccsdk.cds.controllerblueprints.core.logger
33 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
36 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
37 import org.springframework.stereotype.Service
39 interface StreamingRemoteExecutionService<ReqT, ResT> {
41 suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
43 suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
45 suspend fun send(txId: String, input: ReqT)
47 suspend fun cancelSubscription(txId: String)
49 suspend fun closeChannel(selector: Any)
53 @ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
54 havingValue = "true", matchIfMissing = false)
55 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
56 : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
58 private val log = logger(StreamingRemoteExecutionServiceImpl::class)
60 private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
62 private val commChannels: MutableMap<String,
63 ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
67 * Open new channel to send and receive for grpc properties [selector] for [txId],
68 * Create the only one GRPC channel per host port and reuse for further communication.
69 * Create request communication channel to send and receive requests and responses.
70 * We can send multiple request with same txId.
71 * Consume the flow for responses,
72 * Client should cancel the subscription for the request Id once no longer response is needed.
75 override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
77 if (!commChannels.containsKey(txId)) {
78 /** Get GRPC Channel*/
79 val grpcChannel = grpcChannel(selector)
81 /** Get Send and Receive Channel for bidirectional process method*/
82 val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
83 commChannels[txId] = channels
86 val commChannel = commChannels[txId]
87 ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
89 log.info("created subscription for transactionId($txId)")
91 return commChannel.responseChannel.consumeAsFlow()
95 * Send the [input] request, by reusing same GRPC channel and Communication channel
98 override suspend fun send(txId: String, input: ExecutionServiceInput) {
99 val sendChannel = commChannels[txId]?.requestChannel
100 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
103 sendChannel.send(input)
104 log.trace("Message sent for transactionId($txId)")
110 * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
111 * for the GRPC [selector] with execution [timeOutMill].
112 * Other state of the request will be skipped.
113 * The assumption here is you can call this API with same request Id and unique subrequest Id,
114 * so the correlation is sub request id to receive the response.
116 @ExperimentalCoroutinesApi
117 override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
118 : ExecutionServiceOutput {
120 var output: ExecutionServiceOutput? = null
121 val flow = openSubscription(selector, txId)
123 /** Send the request */
124 val sendChannel = commChannels[txId]?.requestChannel
125 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
126 sendChannel.send(input)
128 /** Receive the response with timeout */
129 withTimeout(timeOutMill) {
131 log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
132 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
134 cancelSubscription(txId)
141 /** Cancel the Subscription for the [txId], This closes communication channel **/
142 @ExperimentalCoroutinesApi
143 override suspend fun cancelSubscription(txId: String) {
144 commChannels[txId]?.let {
145 if (!it.requestChannel.isClosedForSend)
146 it.requestChannel.close()
147 /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
148 //it.responseChannel.cancel(CancellationException("subscription cancelled"))
149 commChannels.remove(txId)
150 log.info("closed subscription for transactionId($txId)")
154 /** Close the GRPC channel for the host port poperties [selector]*/
155 override suspend fun closeChannel(selector: Any) {
156 val grpcProperties = grpcProperties(selector)
157 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
158 if (grpcChannels.containsKey(selectorName)) {
159 grpcChannels[selectorName]!!.shutdownNow()
160 grpcChannels.remove(selectorName)
161 log.info("grpc channel($selectorName) shutdown completed")
165 /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
166 private suspend fun grpcChannel(selector: Any): ManagedChannel {
167 val grpcProperties = grpcProperties(selector)
168 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
169 val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
170 val grpcChannel = if (isGrpcChannelCached) {
171 if (grpcChannels[selectorName]!!.isShutdown) {
172 createGrpcChannel(grpcProperties)
174 grpcChannels[selectorName]!!
177 createGrpcChannel(grpcProperties)
179 grpcChannels[selectorName] = grpcChannel
183 suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
184 val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
185 .blueprintGrpcClientService(grpcProperties)
186 return grpcClientService.channel()
189 private fun grpcProperties(selector: Any): GrpcClientProperties {
190 return when (selector) {
192 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
195 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
197 is GrpcClientProperties -> {
201 throw BluePrintException("couldn't process selector($selector)")