2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.google.protobuf.util.JsonFormat
20 import io.grpc.inprocess.InProcessChannelBuilder
21 import io.grpc.inprocess.InProcessServerBuilder
22 import io.grpc.testing.GrpcCleanupRule
23 import io.mockk.coEvery
26 import kotlinx.coroutines.Deferred
27 import kotlinx.coroutines.ExperimentalCoroutinesApi
28 import kotlinx.coroutines.FlowPreview
29 import kotlinx.coroutines.async
30 import kotlinx.coroutines.awaitAll
31 import kotlinx.coroutines.flow.collect
32 import kotlinx.coroutines.runBlocking
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
36 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
37 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
38 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
39 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
40 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
41 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
42 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
43 import org.onap.ccsdk.cds.controllerblueprints.core.logger
44 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
46 import kotlin.test.assertEquals
47 import kotlin.test.assertNotNull
49 class StreamingRemoteExecutionServiceTest {
51 val log = logger(StreamingRemoteExecutionServiceTest::class)
54 val grpcCleanup = GrpcCleanupRule()
55 private val serverName = InProcessServerBuilder.generateName()
56 private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
57 private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
59 private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
62 type = GRPCLibConstants.TYPE_TOKEN_AUTH
63 token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
67 @ExperimentalCoroutinesApi
69 fun testStreamingChannel() {
70 grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
71 val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
74 val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
76 val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
78 val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
79 /** To test with real server, comment below line */
80 coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
82 /** Test Send and Receive non interactive transaction */
83 val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
85 val requestId = "1234-$count"
86 val request = getRequest(requestId)
87 val invocationId = request.commonHeader.subRequestId
88 val deferred = async {
89 val response = spyStreamingRemoteExecutionService.sendNonInteractive(
90 tokenAuthGrpcClientProperties,
91 invocationId, request, 1000L
93 assertNotNull(response, "failed to get non interactive response")
95 response.commonHeader.requestId, requestId,
96 "failed to match non interactive response id"
99 response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
100 "failed to match non interactive response type"
103 nonInteractiveDeferred.add(deferred)
105 nonInteractiveDeferred.awaitAll()
107 /** Test Send and Receive interactive transaction */
108 val responseFlowsDeferred = arrayListOf<Deferred<*>>()
110 val requestId = "12345-$count"
111 val request = getRequest(requestId)
112 val invocationId = request.commonHeader.requestId
113 val responseFlow = spyStreamingRemoteExecutionService
114 .openSubscription(tokenAuthGrpcClientProperties, invocationId)
116 val deferred = async {
117 responseFlow.collect {
118 log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
119 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
120 spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
124 responseFlowsDeferred.add(deferred)
125 /** Sending Multiple messages with same requestId and different subRequestId */
126 spyStreamingRemoteExecutionService.send(invocationId, request)
128 responseFlowsDeferred.awaitAll()
129 streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
133 private fun getRequest(requestId: String): ExecutionServiceInput {
134 val commonHeader = CommonHeader.newBuilder()
135 .setTimestamp("2012-04-23T18:25:43.511Z")
136 .setOriginatorId("System")
137 .setRequestId(requestId)
138 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
140 val actionIdentifier = ActionIdentifiers.newBuilder()
141 .setActionName("SampleScript")
142 .setBlueprintName("sample-cba")
143 .setBlueprintVersion("1.0.0")
144 .setMode(ACTION_MODE_SYNC)
147 val jsonContent = """{ "key1" : "value1" }"""
148 val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
149 JsonFormat.parser().merge(jsonContent, payloadBuilder)
151 return ExecutionServiceInput.newBuilder()
152 .setCommonHeader(commonHeader)
153 .setActionIdentifiers(actionIdentifier)
154 .setPayload(payloadBuilder.build())