c9ff235732c6e5bed3b6ee620aa6d96da04fe279
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import io.grpc.inprocess.InProcessChannelBuilder
20 import io.grpc.inprocess.InProcessServerBuilder
21 import io.grpc.testing.GrpcCleanupRule
22 import io.mockk.coEvery
23 import io.mockk.mockk
24 import io.mockk.spyk
25 import kotlinx.coroutines.*
26 import kotlinx.coroutines.flow.collect
27 import org.junit.Rule
28 import org.junit.Test
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
30 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
32 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
38 import java.util.*
39
40
41 class StreamingRemoteExecutionServiceTest {
42
43     val log = logger(StreamingRemoteExecutionServiceTest::class)
44
45     @get:Rule
46     val grpcCleanup = GrpcCleanupRule()
47     private val serverName = InProcessServerBuilder.generateName()
48     private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
49     private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
50
51     private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
52         host = "127.0.0.1"
53         port = 50052
54         type = GRPCLibConstants.TYPE_TOKEN_AUTH
55         token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
56     }
57
58     @Test
59     @ExperimentalCoroutinesApi
60     @FlowPreview
61     fun testStreamingChannel() {
62         grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
63         val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
64
65         runBlocking {
66             val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
67
68             val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
69
70             val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
71             /** To test with real server, uncomment below line */
72             coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
73
74             val responseFlowsDeferred = arrayListOf<Deferred<*>>()
75
76             repeat(1) { count ->
77                 val requestId = "12345-$count"
78                 val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
79
80                 val deferred = async {
81                     responseFlow.collect {
82                         log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
83                         if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
84                             spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
85                         }
86                     }
87                 }
88                 responseFlowsDeferred.add(deferred)
89                 /** Sending Multiple messages with same requestId  and different subRequestId */
90                 spyStreamingRemoteExecutionService.send(getRequest(requestId))
91             }
92             responseFlowsDeferred.awaitAll()
93             streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
94         }
95
96     }
97
98     private fun getRequest(requestId: String): ExecutionServiceInput {
99         val commonHeader = CommonHeader.newBuilder()
100                 .setTimestamp("2012-04-23T18:25:43.511Z")
101                 .setOriginatorId("System")
102                 .setRequestId(requestId)
103                 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
104
105
106         val actionIdentifier = ActionIdentifiers.newBuilder()
107                 .setActionName("SampleScript")
108                 .setBlueprintName("sample-cba")
109                 .setBlueprintVersion("1.0.0")
110                 .build()
111
112         return ExecutionServiceInput.newBuilder()
113                 .setCommonHeader(commonHeader)
114                 .setActionIdentifiers(actionIdentifier)
115                 //.setPayload(payloadBuilder.build())
116                 .build()
117
118     }
119 }