Revert "Renaming Files having BluePrint to have Blueprint"
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / services / execution-service / src / test / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / services / execution / StreamingRemoteExecutionServiceTest.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
18
19 import com.google.protobuf.util.JsonFormat
20 import io.grpc.inprocess.InProcessChannelBuilder
21 import io.grpc.inprocess.InProcessServerBuilder
22 import io.grpc.testing.GrpcCleanupRule
23 import io.mockk.coEvery
24 import io.mockk.mockk
25 import io.mockk.spyk
26 import kotlinx.coroutines.Deferred
27 import kotlinx.coroutines.ExperimentalCoroutinesApi
28 import kotlinx.coroutines.FlowPreview
29 import kotlinx.coroutines.async
30 import kotlinx.coroutines.awaitAll
31 import kotlinx.coroutines.flow.collect
32 import kotlinx.coroutines.runBlocking
33 import org.junit.Rule
34 import org.junit.Test
35 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_SYNC
36 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
37 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
38 import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
39 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
40 import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
41 import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
42 import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
43 import org.onap.ccsdk.cds.controllerblueprints.core.logger
44 import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
45 import java.util.UUID
46 import kotlin.test.assertEquals
47 import kotlin.test.assertNotNull
48
49 class StreamingRemoteExecutionServiceTest {
50
51     val log = logger(StreamingRemoteExecutionServiceTest::class)
52
53     @get:Rule
54     val grpcCleanup = GrpcCleanupRule()
55     private val serverName = InProcessServerBuilder.generateName()
56     private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
57     private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
58
59     private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
60         host = "127.0.0.1"
61         port = 50052
62         type = GRPCLibConstants.TYPE_TOKEN_AUTH
63         token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
64     }
65
66     @Test
67     @ExperimentalCoroutinesApi
68     @FlowPreview
69     fun testStreamingChannel() {
70         grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
71         val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
72
73         runBlocking {
74             val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
75
76             val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
77
78             val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
79             /** To test with real server, comment below line */
80             coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
81
82             /** Test Send and Receive non interactive transaction */
83             val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
84             repeat(2) { count ->
85                 val requestId = "1234-$count"
86                 val request = getRequest(requestId)
87                 val invocationId = request.commonHeader.subRequestId
88                 val deferred = async {
89                     val response = spyStreamingRemoteExecutionService.sendNonInteractive(
90                         tokenAuthGrpcClientProperties,
91                         invocationId, request, 1000L
92                     )
93                     assertNotNull(response, "failed to get non interactive response")
94                     assertEquals(
95                         response.commonHeader.requestId, requestId,
96                         "failed to match non interactive response id"
97                     )
98                     assertEquals(
99                         response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
100                         "failed to match non interactive response type"
101                     )
102                 }
103                 nonInteractiveDeferred.add(deferred)
104             }
105             nonInteractiveDeferred.awaitAll()
106
107             /** Test Send and Receive interactive transaction */
108             val responseFlowsDeferred = arrayListOf<Deferred<*>>()
109             repeat(2) { count ->
110                 val requestId = "12345-$count"
111                 val request = getRequest(requestId)
112                 val invocationId = request.commonHeader.requestId
113                 val responseFlow = spyStreamingRemoteExecutionService
114                     .openSubscription(tokenAuthGrpcClientProperties, invocationId)
115
116                 val deferred = async {
117                     responseFlow.collect {
118                         log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
119                         if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
120                             spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
121                         }
122                     }
123                 }
124                 responseFlowsDeferred.add(deferred)
125                 /** Sending Multiple messages with same requestId  and different subRequestId */
126                 spyStreamingRemoteExecutionService.send(invocationId, request)
127             }
128             responseFlowsDeferred.awaitAll()
129             streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
130         }
131     }
132
133     private fun getRequest(requestId: String): ExecutionServiceInput {
134         val commonHeader = CommonHeader.newBuilder()
135             .setTimestamp("2012-04-23T18:25:43.511Z")
136             .setOriginatorId("System")
137             .setRequestId(requestId)
138             .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
139
140         val actionIdentifier = ActionIdentifiers.newBuilder()
141             .setActionName("SampleScript")
142             .setBlueprintName("sample-cba")
143             .setBlueprintVersion("1.0.0")
144             .setMode(ACTION_MODE_SYNC)
145             .build()
146
147         val jsonContent = """{ "key1" : "value1" }"""
148         val payloadBuilder = ExecutionServiceInput.newBuilder().payloadBuilder
149         JsonFormat.parser().merge(jsonContent, payloadBuilder)
150
151         return ExecutionServiceInput.newBuilder()
152             .setCommonHeader(commonHeader)
153             .setActionIdentifiers(actionIdentifier)
154             .setPayload(payloadBuilder.build())
155             .build()
156     }
157 }