07848164ee0837af0d38f6f0db04c163daf181f1
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.ExperimentalCoroutinesApi
24 import kotlinx.coroutines.FlowPreview
25 import kotlinx.coroutines.coroutineScope
26 import kotlinx.coroutines.flow.Flow
27 import kotlinx.coroutines.flow.consumeAsFlow
28 import kotlinx.coroutines.launch
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
32 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
37 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
38 import org.springframework.stereotype.Service
39
40 interface StreamingRemoteExecutionService<ReqT, ResT> {
41
42     suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
43
44     suspend fun send(input: ReqT)
45
46     suspend fun cancelSubscription(requestId: String)
47
48     suspend fun closeChannel(selector: Any)
49 }
50
51 @Service
52 @ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
53         havingValue = "true", matchIfMissing = false)
54 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
55     : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
56
57     private val log = logger(StreamingRemoteExecutionServiceImpl::class)
58
59     private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
60
61     private val commChannels: MutableMap<String,
62             ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
63
64
65     /**
66      * Open new channel to send and receive for grpc properties [selector] for [requestId],
67      * Create the only one GRPC channel per host port and reuse for further communication.
68      * Create request communication channel to send and receive requests and responses.
69      * We can send multiple request with same requestId with unique subRequestId.
70      * Consume the flow for responses,
71      * Client should cancel the subscription for the request Id once no longer response is needed.
72      * */
73     @FlowPreview
74     override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
75
76         if (!commChannels.containsKey(requestId)) {
77             /** Get GRPC Channel*/
78             val grpcChannel = grpcChannel(selector)
79
80             /** Get Send and Receive Channel for bidirectional process method*/
81             val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
82             commChannels[requestId] = channels
83         }
84
85         val commChannel = commChannels[requestId]
86                 ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
87
88         log.info("created subscription for request($requestId)")
89
90         return commChannel.responseChannel.consumeAsFlow()
91     }
92
93     /**
94      * Send the [input]request, by reusing same GRPC channel and Communication channel
95      * for the request Id.
96      */
97     override suspend fun send(input: ExecutionServiceInput) {
98         val requestId = input.commonHeader.requestId
99         val subRequestId = input.commonHeader.subRequestId
100         val sendChannel = commChannels[requestId]?.requestChannel
101                 ?: throw BluePrintException("failed to get request($requestId) send channel")
102         coroutineScope {
103             launch {
104                 sendChannel.send(input)
105                 log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
106             }
107         }
108     }
109
110     /** Cancel the Subscription for the [requestId], This closes communication channel **/
111     @ExperimentalCoroutinesApi
112     override suspend fun cancelSubscription(requestId: String) {
113         commChannels[requestId]?.let {
114             if (!it.requestChannel.isClosedForSend)
115                 it.requestChannel.close()
116             /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
117             //it.responseChannel.cancel(CancellationException("subscription cancelled"))
118             commChannels.remove(requestId)
119             log.info("closed subscription for request($requestId)")
120         }
121     }
122
123     /** Close the GRPC channel for the host port poperties [selector]*/
124     override suspend fun closeChannel(selector: Any) {
125         val grpcProperties = grpcProperties(selector)
126         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
127         if (grpcChannels.containsKey(selectorName)) {
128             grpcChannels[selectorName]!!.shutdownNow()
129             grpcChannels.remove(selectorName)
130             log.info("grpc channel($selectorName) shutdown completed")
131         }
132     }
133
134     /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
135     private suspend fun grpcChannel(selector: Any): ManagedChannel {
136         val grpcProperties = grpcProperties(selector)
137         val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
138         val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
139         val grpcChannel = if (isGrpcChannelCached) {
140             if (grpcChannels[selectorName]!!.isShutdown) {
141                 createGrpcChannel(grpcProperties)
142             } else {
143                 grpcChannels[selectorName]!!
144             }
145         } else {
146             createGrpcChannel(grpcProperties)
147         }
148         grpcChannels[selectorName] = grpcChannel
149         return grpcChannel
150     }
151
152     suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
153         val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
154                 .blueprintGrpcClientService(grpcProperties)
155         return grpcClientService.channel()
156     }
157
158     private fun grpcProperties(selector: Any): GrpcClientProperties {
159         return when (selector) {
160             is String -> {
161                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
162             }
163             is JsonNode -> {
164                 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
165             }
166             is GrpcClientProperties -> {
167                 selector
168             }
169             else -> {
170                 throw BluePrintException("couldn't process selector($selector)")
171             }
172         }
173     }
174 }