Bi-directional GRPC non interactive implementation
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / execution-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / execution / StreamingRemoteExecutionService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.*
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.collect
26 import kotlinx.coroutines.flow.consumeAsFlow
27 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
28 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
30 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
31 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
32 import org.onap.ccsdk.cds.controllerblueprints.core.logger
33 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
36 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
37 import org.springframework.stereotype.Service
38
39 interface StreamingRemoteExecutionService<ReqT, ResT> {
40
41     suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
42
43     suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
44
45     suspend fun send(txId: String, input: ReqT)
46
47     suspend fun cancelSubscription(txId: String)
48
49     suspend fun closeChannel(selector: Any)
50 }
51
52 @Service
53 @ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
54         havingValue = "true", matchIfMissing = false)
55 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
56     : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
57
58     private val log = logger(StreamingRemoteExecutionServiceImpl::class)
59
60     private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
61
62     private val commChannels: MutableMap<String,
63             ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
64
65
66     /**
67      * Open new channel to send and receive for grpc properties [selector] for [txId],
68      * Create the only one GRPC channel per host port and reuse for further communication.
69      * Create request communication channel to send and receive requests and responses.
70      * We can send multiple request with same txId.
71      * Consume the flow for responses,
72      * Client should cancel the subscription for the request Id once no longer response is needed.
73      * */
74     @FlowPreview
75     override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
76
77         if (!commChannels.containsKey(txId)) {
78             /** Get GRPC Channel*/
79             val grpcChannel = grpcChannel(selector)
80
81             /** Get Send and Receive Channel for bidirectional process method*/
82             val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
83             commChannels[txId] = channels
84         }
85
86         val commChannel = commChannels[txId]
87                 ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
88
89         log.info("created subscription for transactionId($txId)")
90
91         return commChannel.responseChannel.consumeAsFlow()
92     }
93
94     /**
95      * Send the [input] request, by reusing same GRPC channel and Communication channel
96      * for the request Id.
97      */
98     override suspend fun send(txId: String, input: ExecutionServiceInput) {
99         val sendChannel = commChannels[txId]?.requestChannel
100                 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
101         coroutineScope {
102             launch {
103                 sendChannel.send(input)
104                 log.trace("Message sent for transactionId($txId)")
105             }
106         }
107     }
108
109     /**
110      * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
111      * for the GRPC [selector] with execution [timeOutMill].
112      * Other state of the request will be skipped.
113      * The assumption here is you can call this API with same request Id and unique subrequest Id,
114      * so the correlation is sub request id to receive the response.
115      */
116     @ExperimentalCoroutinesApi
117     override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
118             : ExecutionServiceOutput {
119
120         var output: ExecutionServiceOutput? = null
121         val flow = openSubscription(selector, txId)
122
123         /** Send the request */
124         val sendChannel = commChannels[txId]?.requestChannel
125                 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
126         sendChannel.send(input)
127
128         /** Receive the response with timeout */
129         withTimeout(timeOutMill) {
130             flow.collect {
131                 log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
132                 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
133                     output = it
134                     cancelSubscription(txId)
135                 }
136             }
137         }
138         return output!!
139     }
140
141     /** Cancel the Subscription for the [txId], This closes communication channel **/
142     @ExperimentalCoroutinesApi
143     override suspend fun cancelSubscription(txId: String) {
144         commChannels[txId]?.let {
145             if (!it.requestChannel.isClosedForSend)
146                 it.requestChannel.close()
147             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
148             //it.responseChannel.cancel(CancellationException("subscription cancelled"))
149             commChannels.remove(txId)
150             log.info("closed subscription for transactionId($txId)")
151         }
152     }
153
154     /** Close the GRPC channel for the host port poperties [selector]*/
155     override suspend fun closeChannel(selector: Any) {
156         val grpcProperties = grpcProperties(selector)
157         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
158         if (grpcChannels.containsKey(selectorName)) {
159             grpcChannels[selectorName]!!.shutdownNow()
160             grpcChannels.remove(selectorName)
161             log.info("grpc channel($selectorName) shutdown completed")
162         }
163     }
164
165     /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
166     private suspend fun grpcChannel(selector: Any): ManagedChannel {
167         val grpcProperties = grpcProperties(selector)
168         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
169         val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
170         val grpcChannel = if (isGrpcChannelCached) {
171             if (grpcChannels[selectorName]!!.isShutdown) {
172                 createGrpcChannel(grpcProperties)
173             } else {
174                 grpcChannels[selectorName]!!
175             }
176         } else {
177             createGrpcChannel(grpcProperties)
178         }
179         grpcChannels[selectorName] = grpcChannel
180         return grpcChannel
181     }
182
183     suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
184         val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
185                 .blueprintGrpcClientService(grpcProperties)
186         return grpcClientService.channel()
187     }
188
189     private fun grpcProperties(selector: Any): GrpcClientProperties {
190         return when (selector) {
191             is String -> {
192                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
193             }
194             is JsonNode -> {
195                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
196             }
197             is GrpcClientProperties -> {
198                 selector
199             }
200             else -> {
201                 throw BluePrintException("couldn't process selector($selector)")
202             }
203         }
204     }
205 }