Removed redundant timeout handling for executeCommand
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / blueprints / blueprint-core / src / test / kotlin / org / onap / ccsdk / cds / controllerblueprints / core / service / BlueprintWorkflowServiceTest.kt
1 /*
2  *  Copyright © 2019 IBM.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.controllerblueprints.core.service
18
19 import io.mockk.every
20 import io.mockk.mockk
21 import kotlinx.coroutines.CompletableDeferred
22 import kotlinx.coroutines.async
23 import kotlinx.coroutines.awaitAll
24 import kotlinx.coroutines.coroutineScope
25 import kotlinx.coroutines.runBlocking
26 import kotlinx.coroutines.withTimeout
27 import org.junit.Test
28 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
29 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.data.EdgeLabel
31 import org.onap.ccsdk.cds.controllerblueprints.core.data.Graph
32 import org.onap.ccsdk.cds.controllerblueprints.core.logger
33 import org.onap.ccsdk.cds.controllerblueprints.core.toGraph
34 import kotlin.test.assertNotNull
35
36 class BlueprintWorkflowServiceTest {
37
38     @Test
39     fun testSimpleFlow() {
40         runBlocking {
41             val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>E/SUCCESS, E>END/SUCCESS]"
42                 .toGraph()
43             val simpleWorkflow = TestBlueprintWorkFlowService()
44             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
45             val input = "123456"
46             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
47             assertNotNull(response, "failed to get response")
48         }
49     }
50
51     @Test
52     fun testMultipleFlows() {
53         runBlocking {
54             coroutineScope {
55                 val wfs = listOf("12345", "12346").map {
56                     async {
57                         val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
58                             .toGraph()
59                         val simpleWorkflow = TestBlueprintWorkFlowService()
60                         simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
61                         val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(it), it)
62                         assertNotNull(response, "failed to get response")
63                     }
64                 }
65                 wfs.awaitAll()
66             }
67         }
68     }
69
70     @Test
71     fun testMissingEdgeForBFailureState() {
72         runBlocking {
73             val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
74                 .toGraph()
75             val simpleWorkflow = TestBlueprintWorkFlowService()
76             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), arrayListOf("B"))
77             val input = "123456"
78             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
79             assertNotNull(response, "failed to get response")
80         }
81     }
82
83     @Test
84     fun testBExceptionFlow() {
85         runBlocking {
86             val graph = "[START>A/SUCCESS, A>B/SUCCESS, B>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
87                 .toGraph()
88             val simpleWorkflow = TestBlueprintWorkFlowService()
89             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "C", "D", "E"), null)
90             val input = "123456"
91             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
92             assertNotNull(response, "failed to get response")
93         }
94     }
95
96     @Test
97     fun testTimeoutExceptionFlow() {
98         runBlocking {
99             val graph = "[START>A/SUCCESS, A>TO/SUCCESS, TO>C/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
100                 .toGraph()
101             val simpleWorkflow = TestBlueprintWorkFlowService()
102             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "TO", "C", "D", "E"), null)
103             val input = "123456"
104             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
105             assertNotNull(response, "failed to get response")
106         }
107     }
108
109     @Test
110     fun testConditionalFlow() {
111         runBlocking {
112             val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
113                 .toGraph()
114             val simpleWorkflow = TestBlueprintWorkFlowService()
115             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
116             val input = "123456"
117             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
118             assertNotNull(response, "failed to get response")
119         }
120     }
121
122     @Test
123     fun testBothConditionalFlow() {
124         runBlocking {
125             // Failure Flow
126             val failurePatGraph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
127                 .toGraph()
128             val failurePathWorkflow = TestBlueprintWorkFlowService()
129             failurePathWorkflow.simulatedState = prepareSimulation(
130                 arrayListOf("B", "C", "D", "E"),
131                 arrayListOf("A")
132             )
133             val failurePathWorkflowInput = "123456"
134             val failurePathResponse = failurePathWorkflow.executeWorkflow(failurePatGraph, mockBlueprintRuntimeService(), failurePathWorkflowInput)
135             assertNotNull(failurePathResponse, "failed to get response")
136         }
137     }
138
139     @Test
140     fun testMultipleSkipFlow() {
141         runBlocking {
142             val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/FAILURE, C>D/SUCCESS, D>E/SUCCESS, B>E/SUCCESS, E>END/SUCCESS]"
143                 .toGraph()
144             val simpleWorkflow = TestBlueprintWorkFlowService()
145             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D", "E"), null)
146             val input = "123456"
147             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
148             assertNotNull(response, "failed to get response")
149         }
150     }
151
152     @Test
153     fun testParallelFlow() {
154         runBlocking {
155             val graph = "[START>A/SUCCESS, A>B/SUCCESS, A>C/SUCCESS, B>D/SUCCESS, C>D/SUCCESS, D>END/SUCCESS]"
156                 .toGraph()
157             val simpleWorkflow = TestBlueprintWorkFlowService()
158             simpleWorkflow.simulatedState = prepareSimulation(arrayListOf("A", "B", "C", "D"), null)
159             val input = "123456"
160             val response = simpleWorkflow.executeWorkflow(graph, mockBlueprintRuntimeService(), input)
161             assertNotNull(response, "failed to get response")
162         }
163     }
164
165     private fun mockBlueprintRuntimeService(): BlueprintRuntimeService<*> {
166         return mockBlueprintRuntimeService("123456")
167     }
168
169     private fun mockBlueprintRuntimeService(id: String): BlueprintRuntimeService<*> {
170         val bluePrintRuntimeService = mockk<BlueprintRuntimeService<*>>()
171         every { bluePrintRuntimeService.id() } returns id
172         return bluePrintRuntimeService
173     }
174
175     private fun prepareSimulation(successes: List<String>?, failures: List<String>?): MutableMap<String, EdgeLabel> {
176         val simulatedState: MutableMap<String, EdgeLabel> = hashMapOf()
177         successes?.forEach {
178             simulatedState[it] = EdgeLabel.SUCCESS
179         }
180         failures?.forEach {
181             simulatedState[it] = EdgeLabel.FAILURE
182         }
183         return simulatedState
184     }
185 }
186
187 class TestBlueprintWorkFlowService :
188     AbstractBlueprintWorkFlowService<String, String>() {
189
190     val log = logger(TestBlueprintWorkFlowService::class)
191
192     lateinit var simulatedState: MutableMap<String, EdgeLabel>
193
194     override suspend fun initializeWorkflow(input: String): EdgeLabel {
195         return EdgeLabel.SUCCESS
196     }
197
198     override suspend fun executeWorkflow(graph: Graph, bluePrintRuntimeService: BlueprintRuntimeService<*>, input: String): String {
199         log.info("Executing Graph : $graph")
200         this.graph = graph
201         this.workflowId = bluePrintRuntimeService.id()
202         val output = CompletableDeferred<String>()
203         val startMessage = WorkflowExecuteMessage(input, output)
204         val workflowActor = workflowActor()
205         if (!workflowActor.isClosedForSend) {
206             workflowActor().send(startMessage)
207         } else {
208             throw BlueprintProcessorException("workflow actor is closed for send $workflowActor")
209         }
210         return startMessage.output.await()
211     }
212
213     override suspend fun prepareNodeExecutionMessage(node: Graph.Node):
214         NodeExecuteMessage<String, String> {
215             return NodeExecuteMessage(node, "$node Input", "")
216         }
217
218     override suspend fun executeNode(
219         node: Graph.Node,
220         nodeInput: String,
221         nodeOutput: String
222     ): EdgeLabel {
223         //        val random = (1..10).random() * 100
224         //        log.info("workflow($workflowId) node(${node.id}) will reply in $random ms")
225         //        kotlinx.coroutines.delay(random.toLong())
226         //        //Simulation for timeout
227         if (node.id == "TO") {
228             withTimeout(1) {
229                 kotlinx.coroutines.delay(2)
230             }
231         }
232         return simulatedState[node.id] ?: throw BlueprintException("failed to get status for the node($node)")
233     }
234
235     override suspend fun prepareNodeSkipMessage(node: Graph.Node): NodeSkipMessage<String, String> {
236         val nodeOutput = ""
237         return NodeSkipMessage(node, "$node Skip Input", nodeOutput)
238     }
239
240     override suspend fun skipNode(
241         node: Graph.Node,
242         nodeInput: String,
243         nodeOutput: String
244     ): EdgeLabel {
245         return simulatedState[node.id] ?: throw BlueprintException("failed to get status for the node($node)")
246     }
247
248     override suspend fun cancelNode(
249         node: Graph.Node,
250         nodeInput: String,
251         nodeOutput: String
252     ): EdgeLabel {
253         TODO("not implemented") // To change body of created functions use File | Settings | File Templates.
254     }
255
256     override suspend fun restartNode(
257         node: Graph.Node,
258         nodeInput: String,
259         nodeOutput: String
260     ): EdgeLabel {
261         TODO("not implemented") // To change body of created functions use File | Settings | File Templates.
262     }
263
264     override suspend fun prepareWorkflowOutput(): String {
265         if (exceptions.isNotEmpty()) {
266             exceptions.forEach {
267                 log.error("workflow($workflowId) exceptions :", it)
268             }
269         }
270         return "Final Response"
271     }
272 }