2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import com.google.protobuf.util.JsonFormat
20 import io.grpc.inprocess.InProcessChannelBuilder
21 import io.grpc.inprocess.InProcessServerBuilder
22 import io.grpc.testing.GrpcCleanupRule
23 import io.mockk.coEvery
26 import kotlinx.coroutines.*
27 import kotlinx.coroutines.flow.collect
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
34 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
38 import org.onap.ccsdk.cds.controllerblueprints.core.logger
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
41 import kotlin.test.assertEquals
42 import kotlin.test.assertNotNull
45 class StreamingRemoteExecutionServiceTest {
47 val log = logger(StreamingRemoteExecutionServiceTest::class)
50 val grpcCleanup = GrpcCleanupRule()
51 private val serverName = InProcessServerBuilder.generateName()
52 private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
53 private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
55 private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
58 type = GRPCLibConstants.TYPE_TOKEN_AUTH
59 token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
63 @ExperimentalCoroutinesApi
65 fun testStreamingChannel() {
66 grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
67 val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
70 val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
72 val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
74 val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
75 /** To test with real server, uncomment below line */
76 coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
78 /** Test Send and Receive non interactive transaction */
79 val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
81 val requestId = "1234-$count"
82 val request = getRequest(requestId)
83 val invocationId = request.commonHeader.subRequestId
84 val deferred = async {
85 val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
86 invocationId, request, 1000L)
87 assertNotNull(response, "failed to get non interactive response")
88 assertEquals(response.commonHeader.requestId, requestId,
89 "failed to match non interactive response id")
90 assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
91 "failed to match non interactive response type")
93 nonInteractiveDeferred.add(deferred)
96 nonInteractiveDeferred.awaitAll()
98 /** Test Send and Receive interactive transaction */
99 val responseFlowsDeferred = arrayListOf<Deferred<*>>()
101 val requestId = "12345-$count"
102 val request = getRequest(requestId)
103 val invocationId = request.commonHeader.requestId
104 val responseFlow = spyStreamingRemoteExecutionService
105 .openSubscription(tokenAuthGrpcClientProperties, invocationId)
107 val deferred = async {
108 responseFlow.collect {
109 log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
110 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
111 spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
115 responseFlowsDeferred.add(deferred)
116 /** Sending Multiple messages with same requestId and different subRequestId */
117 spyStreamingRemoteExecutionService.send(invocationId, request)
119 responseFlowsDeferred.awaitAll()
120 streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
125 private fun getRequest(requestId: String): ExecutionServiceInput {
126 val commonHeader = CommonHeader.newBuilder()
127 .setTimestamp("2012-04-23T18:25:43.511Z")
128 .setOriginatorId("System")
129 .setRequestId(requestId)
130 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
133 val actionIdentifier = ActionIdentifiers.newBuilder()
134 .setActionName("SampleScript")
135 .setBlueprintName("sample-cba")
136 .setBlueprintVersion("1.0.0")
137 .setMode(ACTION_MODE_SYNC)
140 val jsonContent = """{ "key1" : "value1" }"""
141 val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
142 JsonFormat.parser().merge(jsonContent, payloadBuilder)
144 return ExecutionServiceInput.newBuilder()
145 .setCommonHeader(commonHeader)
146 .setActionIdentifiers(actionIdentifier)
147 .setPayload(payloadBuilder.build())