Renaming Files having BluePrint to have Blueprint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / execution-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / execution / StreamingRemoteExecutionService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.ExperimentalCoroutinesApi
24 import kotlinx.coroutines.FlowPreview
25 import kotlinx.coroutines.coroutineScope
26 import kotlinx.coroutines.flow.Flow
27 import kotlinx.coroutines.flow.collect
28 import kotlinx.coroutines.flow.consumeAsFlow
29 import kotlinx.coroutines.launch
30 import kotlinx.coroutines.withTimeout
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcClientService
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BlueprintGrpcLibPropertyService
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
35 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BlueprintProcessingServiceGrpc
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
40 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
41 import org.springframework.stereotype.Service
42
43 interface StreamingRemoteExecutionService<ReqT, ResT> {
44
45     suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
46
47     suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
48
49     suspend fun send(txId: String, input: ReqT)
50
51     suspend fun cancelSubscription(txId: String)
52
53     suspend fun closeChannel(selector: Any)
54 }
55
56 @Service
57 @ConditionalOnProperty(
58     prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
59     havingValue = "true", matchIfMissing = false
60 )
61 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BlueprintGrpcLibPropertyService) :
62     StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
63
64     private val log = logger(StreamingRemoteExecutionServiceImpl::class)
65
66     private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
67
68     private val commChannels: MutableMap<String,
69         ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
70
71     /**
72      * Open new channel to send and receive for grpc properties [selector] for [txId],
73      * Create the only one GRPC channel per host port and reuse for further communication.
74      * Create request communication channel to send and receive requests and responses.
75      * We can send multiple request with same txId.
76      * Consume the flow for responses,
77      * Client should cancel the subscription for the request Id once no longer response is needed.
78      * */
79     @FlowPreview
80     override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
81
82         if (!commChannels.containsKey(txId)) {
83             /** Get GRPC Channel*/
84             val grpcChannel = grpcChannel(selector)
85
86             /** Get Send and Receive Channel for bidirectional process method*/
87             val channels = clientCallBidiStreaming(BlueprintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
88             commChannels[txId] = channels
89         }
90
91         val commChannel = commChannels[txId]
92             ?: throw BlueprintException("failed to create response subscription for transactionId($txId) channel")
93
94         log.info("created subscription for transactionId($txId)")
95
96         return commChannel.responseChannel.consumeAsFlow()
97     }
98
99     /**
100      * Send the [input] request, by reusing same GRPC channel and Communication channel
101      * for the request Id.
102      */
103     override suspend fun send(txId: String, input: ExecutionServiceInput) {
104         val sendChannel = commChannels[txId]?.requestChannel
105             ?: throw BlueprintException("failed to get transactionId($txId) send channel")
106         coroutineScope {
107             launch {
108                 sendChannel.send(input)
109                 log.trace("Message sent for transactionId($txId)")
110             }
111         }
112     }
113
114     /**
115      * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
116      * for the GRPC [selector] with execution [timeOutMill].
117      * Other state of the request will be skipped.
118      * The assumption here is you can call this API with same request Id and unique subrequest Id,
119      * so the correlation is sub request id to receive the response.
120      */
121     @ExperimentalCoroutinesApi
122     override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
123         ExecutionServiceOutput {
124
125             var output: ExecutionServiceOutput? = null
126             val flow = openSubscription(selector, txId)
127
128             /** Send the request */
129             val sendChannel = commChannels[txId]?.requestChannel
130                 ?: throw BlueprintException("failed to get transactionId($txId) send channel")
131             sendChannel.send(input)
132
133             /** Receive the response with timeout */
134             withTimeout(timeOutMill) {
135                 flow.collect {
136                     log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
137                     if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
138                         output = it
139                         cancelSubscription(txId)
140                     }
141                 }
142             }
143             return output!!
144         }
145
146     /** Cancel the Subscription for the [txId], This closes communication channel **/
147     @ExperimentalCoroutinesApi
148     override suspend fun cancelSubscription(txId: String) {
149         commChannels[txId]?.let {
150             if (!it.requestChannel.isClosedForSend)
151                 it.requestChannel.close()
152             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
153             // it.responseChannel.cancel(CancellationException("subscription cancelled"))
154             commChannels.remove(txId)
155             log.info("closed subscription for transactionId($txId)")
156         }
157     }
158
159     /** Close the GRPC channel for the host port poperties [selector]*/
160     override suspend fun closeChannel(selector: Any) {
161         val grpcProperties = grpcProperties(selector)
162         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
163         if (grpcChannels.containsKey(selectorName)) {
164             grpcChannels[selectorName]!!.shutdownNow()
165             grpcChannels.remove(selectorName)
166             log.info("grpc channel($selectorName) shutdown completed")
167         }
168     }
169
170     /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
171     private suspend fun grpcChannel(selector: Any): ManagedChannel {
172         val grpcProperties = grpcProperties(selector)
173         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
174         val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
175         val grpcChannel = if (isGrpcChannelCached) {
176             if (grpcChannels[selectorName]!!.isShutdown) {
177                 createGrpcChannel(grpcProperties)
178             } else {
179                 grpcChannels[selectorName]!!
180             }
181         } else {
182             createGrpcChannel(grpcProperties)
183         }
184         grpcChannels[selectorName] = grpcChannel
185         return grpcChannel
186     }
187
188     suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
189         val grpcClientService: BlueprintGrpcClientService = bluePrintGrpcLibPropertyService
190             .blueprintGrpcClientService(grpcProperties)
191         return grpcClientService.channel()
192     }
193
194     private fun grpcProperties(selector: Any): GrpcClientProperties {
195         return when (selector) {
196             is String -> {
197                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
198             }
199             is JsonNode -> {
200                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
201             }
202             is GrpcClientProperties -> {
203                 selector
204             }
205             else -> {
206                 throw BlueprintException("couldn't process selector($selector)")
207             }
208         }
209     }
210 }