Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / execution-service / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / execution / StreamingRemoteExecutionService.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.fasterxml.jackson.databind.node.TextNode
21 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
22 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
23 import io.grpc.ManagedChannel
24 import kotlinx.coroutines.ExperimentalCoroutinesApi
25 import kotlinx.coroutines.FlowPreview
26 import kotlinx.coroutines.coroutineScope
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.collect
29 import kotlinx.coroutines.flow.consumeAsFlow
30 import kotlinx.coroutines.launch
31 import kotlinx.coroutines.withTimeout
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
34 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
36 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
37 import org.onap.ccsdk.cds.controllerblueprints.core.logger
38 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
40 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
41 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
42 import org.springframework.stereotype.Service
43
44 interface StreamingRemoteExecutionService<ReqT, ResT> {
45
46     suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
47
48     suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
49
50     suspend fun send(txId: String, input: ReqT)
51
52     suspend fun cancelSubscription(txId: String)
53
54     suspend fun closeChannel(selector: Any)
55 }
56
57 @Service
58 @ConditionalOnProperty(
59     prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
60     havingValue = "true", matchIfMissing = false
61 )
62 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) :
63     StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
64
65     private val log = logger(StreamingRemoteExecutionServiceImpl::class)
66
67     private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
68
69     private val commChannels: MutableMap<String,
70         ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
71
72     /**
73      * Open new channel to send and receive for grpc properties [selector] for [txId],
74      * Create the only one GRPC channel per host port and reuse for further communication.
75      * Create request communication channel to send and receive requests and responses.
76      * We can send multiple request with same txId.
77      * Consume the flow for responses,
78      * Client should cancel the subscription for the request Id once no longer response is needed.
79      * */
80     @FlowPreview
81     override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
82
83         if (!commChannels.containsKey(txId)) {
84             /** Get GRPC Channel*/
85             val grpcChannel = grpcChannel(selector)
86
87             /** Get Send and Receive Channel for bidirectional process method*/
88             val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
89             commChannels[txId] = channels
90         }
91
92         val commChannel = commChannels[txId]
93             ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
94
95         log.info("created subscription for transactionId($txId)")
96
97         return commChannel.responseChannel.consumeAsFlow()
98     }
99
100     /**
101      * Send the [input] request, by reusing same GRPC channel and Communication channel
102      * for the request Id.
103      */
104     override suspend fun send(txId: String, input: ExecutionServiceInput) {
105         val sendChannel = commChannels[txId]?.requestChannel
106             ?: throw BluePrintException("failed to get transactionId($txId) send channel")
107         coroutineScope {
108             launch {
109                 sendChannel.send(input)
110                 log.trace("Message sent for transactionId($txId)")
111             }
112         }
113     }
114
115     /**
116      * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
117      * for the GRPC [selector] with execution [timeOutMill].
118      * Other state of the request will be skipped.
119      * The assumption here is you can call this API with same request Id and unique subrequest Id,
120      * so the correlation is sub request id to receive the response.
121      */
122     @ExperimentalCoroutinesApi
123     override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long):
124         ExecutionServiceOutput {
125
126             var output: ExecutionServiceOutput? = null
127             val flow = openSubscription(selector, txId)
128
129             /** Send the request */
130             val sendChannel = commChannels[txId]?.requestChannel
131                 ?: throw BluePrintException("failed to get transactionId($txId) send channel")
132             sendChannel.send(input)
133
134             /** Receive the response with timeout */
135             withTimeout(timeOutMill) {
136                 flow.collect {
137                     log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
138                     if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
139                         output = it
140                         cancelSubscription(txId)
141                     }
142                 }
143             }
144             return output!!
145         }
146
147     /** Cancel the Subscription for the [txId], This closes communication channel **/
148     @ExperimentalCoroutinesApi
149     override suspend fun cancelSubscription(txId: String) {
150         commChannels[txId]?.let {
151             if (!it.requestChannel.isClosedForSend)
152                 it.requestChannel.close()
153             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
154             // it.responseChannel.cancel(CancellationException("subscription cancelled"))
155             commChannels.remove(txId)
156             log.info("closed subscription for transactionId($txId)")
157         }
158     }
159
160     /** Close the GRPC channel for the host port poperties [selector]*/
161     override suspend fun closeChannel(selector: Any) {
162         val grpcProperties = grpcProperties(selector)
163         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
164         if (grpcChannels.containsKey(selectorName)) {
165             grpcChannels[selectorName]!!.shutdownNow()
166             grpcChannels.remove(selectorName)
167             log.info("grpc channel($selectorName) shutdown completed")
168         }
169     }
170
171     /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
172     private suspend fun grpcChannel(selector: Any): ManagedChannel {
173         val grpcProperties = grpcProperties(selector)
174         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
175         val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
176         val grpcChannel = if (isGrpcChannelCached) {
177             if (grpcChannels[selectorName]!!.isShutdown) {
178                 createGrpcChannel(grpcProperties)
179             } else {
180                 grpcChannels[selectorName]!!
181             }
182         } else {
183             createGrpcChannel(grpcProperties)
184         }
185         grpcChannels[selectorName] = grpcChannel
186         return grpcChannel
187     }
188
189     suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
190         val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
191             .blueprintGrpcClientService(grpcProperties)
192         return grpcClientService.channel()
193     }
194
195     private fun grpcProperties(selector: Any): GrpcClientProperties {
196         return when (selector) {
197             is TextNode -> {
198                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.asText())
199             }
200             is JsonNode -> {
201                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
202             }
203             is GrpcClientProperties -> {
204                 selector
205             }
206             else -> {
207                 throw BluePrintException("couldn't process selector($selector)")
208             }
209         }
210     }
211 }