29d24c6ad9f607e119d7d7b20c8dc687ffc05e35
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import io.grpc.inprocess.InProcessChannelBuilder
20 import io.grpc.inprocess.InProcessServerBuilder
21 import io.grpc.testing.GrpcCleanupRule
22 import io.mockk.coEvery
23 import io.mockk.mockk
24 import io.mockk.spyk
25 import kotlinx.coroutines.*
26 import kotlinx.coroutines.flow.collect
27 import org.junit.Rule
28 import org.junit.Test
29 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
30 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
32 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
33 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
34 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
36 import org.onap.ccsdk.cds.controllerblueprints.core.logger
37 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
38 import java.util.*
39 import kotlin.test.assertEquals
40 import kotlin.test.assertNotNull
41
42
43 class StreamingRemoteExecutionServiceTest {
44
45     val log = logger(StreamingRemoteExecutionServiceTest::class)
46
47     @get:Rule
48     val grpcCleanup = GrpcCleanupRule()
49     private val serverName = InProcessServerBuilder.generateName()
50     private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
51     private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
52
53     private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
54         host = "127.0.0.1"
55         port = 50052
56         type = GRPCLibConstants.TYPE_TOKEN_AUTH
57         token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
58     }
59
60     @Test
61     @ExperimentalCoroutinesApi
62     @FlowPreview
63     fun testStreamingChannel() {
64         grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
65         val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
66
67         runBlocking {
68             val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
69
70             val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
71
72             val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
73             /** To test with real server, uncomment below line */
74             coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
75
76             /** Test Send and Receive non interactive transaction */
77             val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
78             repeat(2) { count ->
79                 val requestId = "1234-$count"
80                 val request = getRequest(requestId)
81                 val invocationId = request.commonHeader.subRequestId
82                 val deferred = async {
83                     val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
84                             invocationId, request, 1000L)
85                     assertNotNull(response, "failed to get non interactive response")
86                     assertEquals(response.commonHeader.requestId, requestId,
87                             "failed to match non interactive response id")
88                     assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
89                             "failed to match non interactive response type")
90                 }
91                 nonInteractiveDeferred.add(deferred)
92
93             }
94             nonInteractiveDeferred.awaitAll()
95
96             /** Test Send and Receive interactive transaction */
97             val responseFlowsDeferred = arrayListOf<Deferred<*>>()
98             repeat(2) { count ->
99                 val requestId = "12345-$count"
100                 val request = getRequest(requestId)
101                 val invocationId = request.commonHeader.requestId
102                 val responseFlow = spyStreamingRemoteExecutionService
103                         .openSubscription(tokenAuthGrpcClientProperties, invocationId)
104
105                 val deferred = async {
106                     responseFlow.collect {
107                         log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
108                         if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
109                             spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
110                         }
111                     }
112                 }
113                 responseFlowsDeferred.add(deferred)
114                 /** Sending Multiple messages with same requestId  and different subRequestId */
115                 spyStreamingRemoteExecutionService.send(invocationId, request)
116             }
117             responseFlowsDeferred.awaitAll()
118             streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
119         }
120
121     }
122
123     private fun getRequest(requestId: String): ExecutionServiceInput {
124         val commonHeader = CommonHeader.newBuilder()
125                 .setTimestamp("2012-04-23T18:25:43.511Z")
126                 .setOriginatorId("System")
127                 .setRequestId(requestId)
128                 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
129
130
131         val actionIdentifier = ActionIdentifiers.newBuilder()
132                 .setActionName("SampleScript")
133                 .setBlueprintName("sample-cba")
134                 .setBlueprintVersion("1.0.0")
135                 .build()
136
137         return ExecutionServiceInput.newBuilder()
138                 .setCommonHeader(commonHeader)
139                 .setActionIdentifiers(actionIdentifier)
140                 //.setPayload(payloadBuilder.build())
141                 .build()
142
143     }
144 }