2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.fasterxml.jackson.databind.JsonNode
20 import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
21 import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
22 import io.grpc.ManagedChannel
23 import kotlinx.coroutines.ExperimentalCoroutinesApi
24 import kotlinx.coroutines.FlowPreview
25 import kotlinx.coroutines.coroutineScope
26 import kotlinx.coroutines.flow.Flow
27 import kotlinx.coroutines.flow.consumeAsFlow
28 import kotlinx.coroutines.launch
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
30 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
32 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
33 import org.onap.ccsdk.cds.controllerblueprints.core.logger
34 import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
35 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
36 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
37 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
38 import org.springframework.stereotype.Service
40 interface StreamingRemoteExecutionService<ReqT, ResT> {
42 suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
44 suspend fun send(input: ReqT)
46 suspend fun cancelSubscription(requestId: String)
48 suspend fun closeChannel(selector: Any)
52 @ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
53 havingValue = "true", matchIfMissing = false)
54 class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
55 : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
57 private val log = logger(StreamingRemoteExecutionServiceImpl::class)
59 private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
61 private val commChannels: MutableMap<String,
62 ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
66 * Open new channel to send and receive for grpc properties [selector] for [requestId],
67 * Create the only one GRPC channel per host port and reuse for further communication.
68 * Create request communication channel to send and receive requests and responses.
69 * We can send multiple request with same requestId with unique subRequestId.
70 * Consume the flow for responses,
71 * Client should cancel the subscription for the request Id once no longer response is needed.
74 override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
76 if (!commChannels.containsKey(requestId)) {
77 /** Get GRPC Channel*/
78 val grpcChannel = grpcChannel(selector)
80 /** Get Send and Receive Channel for bidirectional process method*/
81 val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
82 commChannels[requestId] = channels
85 val commChannel = commChannels[requestId]
86 ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
88 log.info("created subscription for request($requestId)")
90 return commChannel.responseChannel.consumeAsFlow()
94 * Send the [input]request, by reusing same GRPC channel and Communication channel
97 override suspend fun send(input: ExecutionServiceInput) {
98 val requestId = input.commonHeader.requestId
99 val subRequestId = input.commonHeader.subRequestId
100 val sendChannel = commChannels[requestId]?.requestChannel
101 ?: throw BluePrintException("failed to get request($requestId) send channel")
104 sendChannel.send(input)
105 log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
110 /** Cancel the Subscription for the [requestId], This closes communication channel **/
111 @ExperimentalCoroutinesApi
112 override suspend fun cancelSubscription(requestId: String) {
113 commChannels[requestId]?.let {
114 if (!it.requestChannel.isClosedForSend)
115 it.requestChannel.close()
116 /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
117 //it.responseChannel.cancel(CancellationException("subscription cancelled"))
118 commChannels.remove(requestId)
119 log.info("closed subscription for request($requestId)")
123 /** Close the GRPC channel for the host port poperties [selector]*/
124 override suspend fun closeChannel(selector: Any) {
125 val grpcProperties = grpcProperties(selector)
126 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
127 if (grpcChannels.containsKey(selectorName)) {
128 grpcChannels[selectorName]!!.shutdownNow()
129 grpcChannels.remove(selectorName)
130 log.info("grpc channel($selectorName) shutdown completed")
134 /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
135 private suspend fun grpcChannel(selector: Any): ManagedChannel {
136 val grpcProperties = grpcProperties(selector)
137 val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
138 val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
139 val grpcChannel = if (isGrpcChannelCached) {
140 if (grpcChannels[selectorName]!!.isShutdown) {
141 createGrpcChannel(grpcProperties)
143 grpcChannels[selectorName]!!
146 createGrpcChannel(grpcProperties)
148 grpcChannels[selectorName] = grpcChannel
152 suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
153 val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
154 .blueprintGrpcClientService(grpcProperties)
155 return grpcClientService.channel()
158 private fun grpcProperties(selector: Any): GrpcClientProperties {
159 return when (selector) {
161 bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
164 bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
166 is GrpcClientProperties -> {
170 throw BluePrintException("couldn't process selector($selector)")