Provide correct output to #process request
[ccsdk/apps.git] / ms / blueprintsprocessor / modules / inbounds / selfservice-api / src / main / kotlin / org / onap / ccsdk / apps / blueprintsprocessor / selfservice / api / ExecutionServiceHandler.kt
index 0b361d8..4447dd4 100644 (file)
@@ -16,6 +16,9 @@
 
 package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api
 
+import com.fasterxml.jackson.databind.node.JsonNodeFactory
+import com.google.protobuf.Struct
+import io.grpc.stub.StreamObserver
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
@@ -26,6 +29,7 @@ import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInp
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.saveCBAFile
+import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto
 import org.onap.ccsdk.apps.blueprintsprocessor.services.workflow.BlueprintDGExecutionService
 import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintException
@@ -56,21 +60,34 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
         }
     }
 
-    fun process(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
-        return when {
+    fun process(executionServiceInput: ExecutionServiceInput,
+                responseObserver: StreamObserver<org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput>,
+                inputPayload: Struct) {
+        when {
             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
                 GlobalScope.launch(Dispatchers.Default) {
-                    // TODO post result in DMaaP
                     val executionServiceOutput = doProcess(executionServiceInput)
+                    responseObserver.onNext(executionServiceOutput.toProto())
+                    responseObserver.onCompleted()
                 }
-                response(executionServiceInput)
+                responseObserver.onNext(response(executionServiceInput).toProto())
             }
-            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> doProcess(executionServiceInput)
-            else -> response(executionServiceInput, true)
+            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
+                val executionServiceOutput = doProcess(executionServiceInput)
+                responseObserver.onNext(executionServiceOutput.toProto())
+                responseObserver.onCompleted()
+            }
+            else -> responseObserver.onNext(response(executionServiceInput,
+                "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+                true).toProto());
         }
     }
 
-    fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+    fun processSync(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+        return doProcess(executionServiceInput)
+    }
+
+    private fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
         log.info("processing request id $requestId")
 
@@ -87,13 +104,15 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
         return blueprintDGExecutionService.executeDirectedGraph(blueprintRuntimeService, executionServiceInput)
     }
 
-    fun response(executionServiceInput: ExecutionServiceInput, failure: Boolean = false): ExecutionServiceOutput {
+    private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
+                         failure: Boolean = false): ExecutionServiceOutput {
         val executionServiceOutput = ExecutionServiceOutput()
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
-        executionServiceOutput.payload = executionServiceInput.payload
+        executionServiceOutput.payload = JsonNodeFactory.instance.objectNode()
 
         val status = Status()
+        status.errorMessage = errorMessage
         if (failure) {
             status.eventType = "EVENT-COMPONENT-FAILURE"
             status.code = 500