9a5be0151aa267bb9c3db5fa0cc41ae4dbe40f80
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.google.protobuf.util.JsonFormat
20 import io.grpc.inprocess.InProcessChannelBuilder
21 import io.grpc.inprocess.InProcessServerBuilder
22 import io.grpc.testing.GrpcCleanupRule
23 import io.mockk.coEvery
24 import io.mockk.mockk
25 import io.mockk.spyk
26 import kotlinx.coroutines.*
27 import kotlinx.coroutines.flow.collect
28 import org.junit.Rule
29 import org.junit.Test
30 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
31 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
32 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
33 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
34 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
35 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
36 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
37 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
38 import org.onap.ccsdk.cds.controllerblueprints.core.logger
39 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
40 import java.util.*
41 import kotlin.test.assertEquals
42 import kotlin.test.assertNotNull
43
44
45 class StreamingRemoteExecutionServiceTest {
46
47     val log = logger(StreamingRemoteExecutionServiceTest::class)
48
49     @get:Rule
50     val grpcCleanup = GrpcCleanupRule()
51     private val serverName = InProcessServerBuilder.generateName()
52     private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
53     private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
54
55     private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
56         host = "127.0.0.1"
57         port = 50052
58         type = GRPCLibConstants.TYPE_TOKEN_AUTH
59         token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
60     }
61
62     @Test
63     @ExperimentalCoroutinesApi
64     @FlowPreview
65     fun testStreamingChannel() {
66         grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
67         val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
68
69         runBlocking {
70             val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
71
72             val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
73
74             val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
75             /** To test with real server, uncomment below line */
76             coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
77
78             /** Test Send and Receive non interactive transaction */
79             val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
80             repeat(2) { count ->
81                 val requestId = "1234-$count"
82                 val request = getRequest(requestId)
83                 val invocationId = request.commonHeader.subRequestId
84                 val deferred = async {
85                     val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
86                             invocationId, request, 1000L)
87                     assertNotNull(response, "failed to get non interactive response")
88                     assertEquals(response.commonHeader.requestId, requestId,
89                             "failed to match non interactive response id")
90                     assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
91                             "failed to match non interactive response type")
92                 }
93                 nonInteractiveDeferred.add(deferred)
94
95             }
96             nonInteractiveDeferred.awaitAll()
97
98             /** Test Send and Receive interactive transaction */
99             val responseFlowsDeferred = arrayListOf<Deferred<*>>()
100             repeat(2) { count ->
101                 val requestId = "12345-$count"
102                 val request = getRequest(requestId)
103                 val invocationId = request.commonHeader.requestId
104                 val responseFlow = spyStreamingRemoteExecutionService
105                         .openSubscription(tokenAuthGrpcClientProperties, invocationId)
106
107                 val deferred = async {
108                     responseFlow.collect {
109                         log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
110                         if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
111                             spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
112                         }
113                     }
114                 }
115                 responseFlowsDeferred.add(deferred)
116                 /** Sending Multiple messages with same requestId  and different subRequestId */
117                 spyStreamingRemoteExecutionService.send(invocationId, request)
118             }
119             responseFlowsDeferred.awaitAll()
120             streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
121         }
122
123     }
124
125     private fun getRequest(requestId: String): ExecutionServiceInput {
126         val commonHeader = CommonHeader.newBuilder()
127                 .setTimestamp("2012-04-23T18:25:43.511Z")
128                 .setOriginatorId("System")
129                 .setRequestId(requestId)
130                 .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
131
132
133         val actionIdentifier = ActionIdentifiers.newBuilder()
134                 .setActionName("SampleScript")
135                 .setBlueprintName("sample-cba")
136                 .setBlueprintVersion("1.0.0")
137                 .setMode(ACTION_MODE_SYNC)
138                 .build()
139
140         val jsonContent = """{ "key1" : "value1" }"""
141         val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
142         JsonFormat.parser().merge(jsonContent, payloadBuilder)
143
144         return ExecutionServiceInput.newBuilder()
145                 .setCommonHeader(commonHeader)
146                 .setActionIdentifiers(actionIdentifier)
147                 .setPayload(payloadBuilder.build())
148                 .build()
149
150     }
151 }