Properly handle async gRPC request 24/79024/2
authorAlexis de Talhouët <adetalhouet89@gmail.com>
Fri, 22 Feb 2019 15:31:14 +0000 (10:31 -0500)
committerAlexis de Talhouët <adetalhouet89@gmail.com>
Fri, 22 Feb 2019 15:32:15 +0000 (15:32 +0000)
Change-Id: Ia5016759466c57f749c146a004374c2cbff60f9d
Issue-ID: CCSDK-947
Signed-off-by: Alexis de Talhouët <adetalhouet89@gmail.com>
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/apps/blueprintsprocessor/selfservice/api/utils/BluePrintMappings.kt

index 453306d..cf6776c 100644 (file)
@@ -19,7 +19,6 @@ package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api
 import io.grpc.stub.StreamObserver
 import org.onap.ccsdk.apps.blueprintsprocessor.core.BluePrintCoreConfiguration
 import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toJava
-import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto
 import org.onap.ccsdk.apps.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceInput
 import org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput
@@ -32,14 +31,14 @@ class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: Blu
     : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
     private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java)
 
-    override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>?): StreamObserver<ExecutionServiceInput> {
+    override fun process(
+        responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
 
         return object : StreamObserver<ExecutionServiceInput> {
             override fun onNext(executionServiceInput: ExecutionServiceInput) {
                 try {
-                    val output = executionServiceHandler.process(executionServiceInput.toJava())
-                            .toProto(executionServiceInput.payload)
-                    responseObserver?.onNext(output)
+                    val inputPayload = executionServiceInput.payload
+                    executionServiceHandler.process(executionServiceInput.toJava(), responseObserver, inputPayload)
                 } catch (e: Exception) {
                     onError(e)
                 }
@@ -47,13 +46,13 @@ class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: Blu
 
             override fun onError(error: Throwable) {
                 log.debug("Fail to process message", error)
-                responseObserver?.onError(io.grpc.Status.INTERNAL
-                        .withDescription(error.message)
-                        .asException())
+                responseObserver.onError(io.grpc.Status.INTERNAL
+                    .withDescription(error.message)
+                    .asException())
             }
 
             override fun onCompleted() {
-                responseObserver?.onCompleted()
+                log.info("Completed")
             }
         }
     }
index e4734c4..6477c06 100644 (file)
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api
 
 import io.swagger.annotations.ApiOperation
+import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.springframework.beans.factory.annotation.Autowired
@@ -49,15 +50,19 @@ class ExecutionServiceController {
     @ResponseBody
     fun upload(@RequestPart("file") parts: Mono<FilePart>): Mono<String> {
         return parts
-                .filter { it is FilePart }
-                .ofType(FilePart::class.java)
-                .flatMap(executionServiceHandler::upload)
+            .filter { it is FilePart }
+            .ofType(FilePart::class.java)
+            .flatMap(executionServiceHandler::upload)
     }
 
     @RequestMapping(path = ["/process"], method = [RequestMethod.POST], produces = [MediaType.APPLICATION_JSON_VALUE])
-    @ApiOperation(value = "Resolve Resource Mappings", notes = "Takes the blueprint information and process as per the payload")
+    @ApiOperation(value = "Resolve Resource Mappings",
+        notes = "Takes the blueprint information and process as per the payload")
     @ResponseBody
     fun process(@RequestBody executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
-        return executionServiceHandler.process(executionServiceInput)
+        if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
+            throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
+        }
+        return executionServiceHandler.processSync(executionServiceInput)
     }
 }
index ec605c1..262c33f 100644 (file)
@@ -16,6 +16,8 @@
 
 package org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api
 
+import com.google.protobuf.Struct
+import io.grpc.stub.StreamObserver
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
@@ -26,6 +28,7 @@ import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceInp
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status
 import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.saveCBAFile
+import org.onap.ccsdk.apps.blueprintsprocessor.selfservice.api.utils.toProto
 import org.onap.ccsdk.apps.blueprintsprocessor.services.workflow.BlueprintDGExecutionService
 import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.apps.controllerblueprints.core.BluePrintException
@@ -56,20 +59,33 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
         }
     }
 
-    fun process(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
-        return when {
+    fun process(executionServiceInput: ExecutionServiceInput,
+                responseObserver: StreamObserver<org.onap.ccsdk.apps.controllerblueprints.processing.api.ExecutionServiceOutput>,
+                inputPayload: Struct) {
+        when {
             executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC -> {
                 GlobalScope.launch(Dispatchers.Default) {
-                    // TODO post result in DMaaP
                     val executionServiceOutput = doProcess(executionServiceInput)
+                    responseObserver.onNext(executionServiceOutput.toProto(inputPayload))
+                    responseObserver.onCompleted()
                 }
-                response(executionServiceInput)
+                responseObserver.onNext(response(executionServiceInput).toProto(inputPayload))
             }
-            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> doProcess(executionServiceInput)
-            else -> response(executionServiceInput, "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", true)
+            executionServiceInput.actionIdentifiers.mode == ACTION_MODE_SYNC -> {
+                val executionServiceOutput = doProcess(executionServiceInput)
+                responseObserver.onNext(executionServiceOutput.toProto(inputPayload))
+                responseObserver.onCompleted()
+            }
+            else -> responseObserver.onNext(response(executionServiceInput,
+                "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.",
+                true).toProto(inputPayload));
         }
     }
 
+    fun processSync(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
+        return doProcess(executionServiceInput)
+    }
+
     private fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput {
         val requestId = executionServiceInput.commonHeader.requestId
         log.info("processing request id $requestId")
@@ -87,7 +103,8 @@ class ExecutionServiceHandler(private val bluePrintCoreConfiguration: BluePrintC
         return blueprintDGExecutionService.executeDirectedGraph(blueprintRuntimeService, executionServiceInput)
     }
 
-    fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "", failure: Boolean = false): ExecutionServiceOutput {
+    private fun response(executionServiceInput: ExecutionServiceInput, errorMessage: String = "",
+                         failure: Boolean = false): ExecutionServiceOutput {
         val executionServiceOutput = ExecutionServiceOutput()
         executionServiceOutput.commonHeader = executionServiceInput.commonHeader
         executionServiceOutput.actionIdentifiers = executionServiceInput.actionIdentifiers
index 220a6fd..c8ce1c3 100644 (file)
@@ -139,7 +139,7 @@ fun Flag.toJava(): org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Flags {
 fun org.onap.ccsdk.apps.blueprintsprocessor.core.api.data.Status.toProto(): Status {
     val status = Status.newBuilder()
     status.code = this.code
-    status.errorMessage = this.errorMessage
+    status.errorMessage = this.errorMessage ?: ""
     status.message = this.message
     status.timestamp = this.timestamp.toString()
     status.eventType = this.eventType