2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.fasterxml.jackson.databind.node.TextNode
21 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
22 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
23 import io.grpc.ManagedChannel
24 import kotlinx.coroutines.ExperimentalCoroutinesApi
25 import kotlinx.coroutines.FlowPreview
26 import kotlinx.coroutines.coroutineScope
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.collect
29 import kotlinx.coroutines.flow.consumeAsFlow
30 import kotlinx.coroutines.launch
31 import kotlinx.coroutines.withTimeout
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
34 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
36 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
37 import org.onap.ccsdk.cds.controllerblueprints.core.logger
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
41 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
42 import org.springframework.stereotype.Service
44 interface StreamingRemoteExecutionService<ReqT, ResT> {
46 suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
48 suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
50 suspend fun send(txId: String, input: ReqT)
52 suspend fun cancelSubscription(txId: String)
54 suspend fun closeChannel(selector: Any)
58 @ConditionalOnProperty(
59 prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
60 havingValue = "true", matchIfMissing = false
62 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) :
63 StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
65 private val log = logger(StreamingRemoteExecutionServiceImpl::class)
67 private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
69 private val commChannels: MutableMap<String,
70 ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
73 * Open new channel to send and receive for grpc properties [selector] for [txId],
74 * Create the only one GRPC channel per host port and reuse for further communication.
75 * Create request communication channel to send and receive requests and responses.
76 * We can send multiple request with same txId.
77 * Consume the flow for responses,
78 * Client should cancel the subscription for the request Id once no longer response is needed.
81 override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
83 if (!commChannels.containsKey(txId)) {
84 /** Get GRPC Channel*/
85 val grpcChannel = grpcChannel(selector)
87 /** Get Send and Receive Channel for bidirectional process method*/
88 val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
89 commChannels[txId] = channels
92 val commChannel = commChannels[txId]
93 ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
95 log.info("created subscription for transactionId($txId)")
97 return commChannel.responseChannel.consumeAsFlow()
101 * Send the [input] request, by reusing same GRPC channel and Communication channel
102 * for the request Id.
104 override suspend fun send(txId: String, input: ExecutionServiceInput) {
105 val sendChannel = commChannels[txId]?.requestChannel
106 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
109 sendChannel.send(input)
110 log.trace("Message sent for transactionId($txId)")
116 * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
117 * for the GRPC [selector] with execution [timeOutMill].
118 * Other state of the request will be skipped.
119 * The assumption here is you can call this API with same request Id and unique subrequest Id,
120 * so the correlation is sub request id to receive the response.
122 @ExperimentalCoroutinesApi
123 override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
124 ExecutionServiceOutput {
126 var output: ExecutionServiceOutput? = null
127 val flow = openSubscription(selector, txId)
129 /** Send the request */
130 val sendChannel = commChannels[txId]?.requestChannel
131 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
132 sendChannel.send(input)
134 /** Receive the response with timeout */
135 withTimeout(timeOutMill) {
137 log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
138 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
140 cancelSubscription(txId)
147 /** Cancel the Subscription for the [txId], This closes communication channel **/
148 @ExperimentalCoroutinesApi
149 override suspend fun cancelSubscription(txId: String) {
150 commChannels[txId]?.let {
151 if (!it.requestChannel.isClosedForSend)
152 it.requestChannel.close()
153 /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
154 // it.responseChannel.cancel(CancellationException("subscription cancelled"))
155 commChannels.remove(txId)
156 log.info("closed subscription for transactionId($txId)")
160 /** Close the GRPC channel for the host port poperties [selector]*/
161 override suspend fun closeChannel(selector: Any) {
162 val grpcProperties = grpcProperties(selector)
163 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
164 if (grpcChannels.containsKey(selectorName)) {
165 grpcChannels[selectorName]!!.shutdownNow()
166 grpcChannels.remove(selectorName)
167 log.info("grpc channel($selectorName) shutdown completed")
171 /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
172 private suspend fun grpcChannel(selector: Any): ManagedChannel {
173 val grpcProperties = grpcProperties(selector)
174 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
175 val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
176 val grpcChannel = if (isGrpcChannelCached) {
177 if (grpcChannels[selectorName]!!.isShutdown) {
178 createGrpcChannel(grpcProperties)
180 grpcChannels[selectorName]!!
183 createGrpcChannel(grpcProperties)
185 grpcChannels[selectorName] = grpcChannel
189 suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
190 val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
191 .blueprintGrpcClientService(grpcProperties)
192 return grpcClientService.channel()
195 private fun grpcProperties(selector: Any): GrpcClientProperties {
196 return when (selector) {
198 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.asText())
201 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
203 is GrpcClientProperties -> {
207 throw BluePrintException("couldn't process selector($selector)")