2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
19 import io.grpc.inprocess.InProcessChannelBuilder
20 import io.grpc.inprocess.InProcessServerBuilder
21 import io.grpc.testing.GrpcCleanupRule
22 import io.mockk.coEvery
25 import kotlinx.coroutines.*
26 import kotlinx.coroutines.flow.collect
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
30 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
32 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
39 import kotlin.test.assertEquals
40 import kotlin.test.assertNotNull
43 class StreamingRemoteExecutionServiceTest {
45 val log = logger(StreamingRemoteExecutionServiceTest::class)
48 val grpcCleanup = GrpcCleanupRule()
49 private val serverName = InProcessServerBuilder.generateName()
50 private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
51 private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
53 private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
56 type = GRPCLibConstants.TYPE_TOKEN_AUTH
57 token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
61 @ExperimentalCoroutinesApi
63 fun testStreamingChannel() {
64 grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
65 val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
68 val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
70 val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
72 val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
73 /** To test with real server, uncomment below line */
74 coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
76 /** Test Send and Receive non interactive transaction */
77 val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
79 val requestId = "1234-$count"
80 val request = getRequest(requestId)
81 val invocationId = request.commonHeader.subRequestId
82 val deferred = async {
83 val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
84 invocationId, request, 1000L)
85 assertNotNull(response, "failed to get non interactive response")
86 assertEquals(response.commonHeader.requestId, requestId,
87 "failed to match non interactive response id")
88 assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
89 "failed to match non interactive response type")
91 nonInteractiveDeferred.add(deferred)
94 nonInteractiveDeferred.awaitAll()
96 /** Test Send and Receive interactive transaction */
97 val responseFlowsDeferred = arrayListOf<Deferred<*>>()
99 val requestId = "12345-$count"
100 val request = getRequest(requestId)
101 val invocationId = request.commonHeader.requestId
102 val responseFlow = spyStreamingRemoteExecutionService
103 .openSubscription(tokenAuthGrpcClientProperties, invocationId)
105 val deferred = async {
106 responseFlow.collect {
107 log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
108 if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
109 spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
113 responseFlowsDeferred.add(deferred)
114 /** Sending Multiple messages with same requestId and different subRequestId */
115 spyStreamingRemoteExecutionService.send(invocationId, request)
117 responseFlowsDeferred.awaitAll()
118 streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
123 private fun getRequest(requestId: String): ExecutionServiceInput {
124 val commonHeader = CommonHeader.newBuilder()
125 .setTimestamp("2012-04-23T18:25:43.511Z")
126 .setOriginatorId("System")
127 .setRequestId(requestId)
128 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
131 val actionIdentifier = ActionIdentifiers.newBuilder()
132 .setActionName("SampleScript")
133 .setBlueprintName("sample-cba")
134 .setBlueprintVersion("1.0.0")
137 return ExecutionServiceInput.newBuilder()
138 .setCommonHeader(commonHeader)
139 .setActionIdentifiers(actionIdentifier)
140 //.setPayload(payloadBuilder.build())