Rolling upgrade support for in-flight requests 11/97811/4
authorSebastien Premont-Tendland <sebastien.premont@bell.ca>
Thu, 31 Oct 2019 19:37:48 +0000 (15:37 -0400)
committerSebastien Premont-Tendland <sebastien.premont@bell.ca>
Tue, 5 Nov 2019 23:05:03 +0000 (18:05 -0500)
Three entry points are being handled :
 1 - REST endpoint
 2 - gRPC endpoint
 3 - Kafka consumer

We make use of Phaser object to make sure the PreDestroy callback wait
for all requests to be executed before stopping the process.

The docker image was also modified to make sure the java process becomes
PID 1 in the container in order to catch the SIGTERM signal which
triggers the PreDestroy callback of Spring. This was done by using
the "exec" command in bash.

Issue-ID: CCSDK-1885
Signed-off-by: Sebastien Premont-Tendland <sebastien.premont@bell.ca>
Change-Id: I3e2a72e26a4c8b7768ebc374ea40aa8d55fb6761

ms/blueprintsprocessor/application/src/main/docker/Dockerfile
ms/blueprintsprocessor/application/src/main/docker/run.source [deleted file]
ms/blueprintsprocessor/application/src/main/docker/startService.sh
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt

index 207cec5..2a85f1c 100755 (executable)
@@ -1,7 +1,6 @@
 FROM omahoco1/alpine-java-python
 
 # add entrypoint
-COPY run.source /etc/run.source
 COPY startService.sh /startService.sh
 RUN chmod 777 /startService.sh && dos2unix /startService.sh
 
@@ -12,4 +11,4 @@ RUN tar -xzf /source.tar.gz -C /tmp \
  && rm -rf /source.tar.gz \
  && rm -rf /tmp/@project.build.finalName@
 
-ENTRYPOINT /startService.sh
+ENTRYPOINT [ "/startService.sh" ]
diff --git a/ms/blueprintsprocessor/application/src/main/docker/run.source b/ms/blueprintsprocessor/application/src/main/docker/run.source
deleted file mode 100755 (executable)
index f3d8c7c..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \
--DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \
--DrouteOffer=${ROUTEOFFER} \
--DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \
--DSecurityFilePath=/etc \
--DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \
--Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \
--Dlogging.config=${APP_CONFIG_HOME}/logback.xml \
--Djava.security.egd=file:/dev/./urandom \
--DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
--Dspring.config.location=${APP_CONFIG_HOME}/ \
-org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
index 14d772e..11c471f 100644 (file)
@@ -7,4 +7,15 @@ export APP_HOME=/opt/app/onap
 
 keytool -import -noprompt -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer
 
-source /etc/run.source
+exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \
+-DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \
+-DrouteOffer=${ROUTEOFFER} \
+-DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \
+-DSecurityFilePath=/etc \
+-DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \
+-Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \
+-Dlogging.config=${APP_CONFIG_HOME}/logback.xml \
+-Djava.security.egd=file:/dev/./urandom \
+-DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
+-Dspring.config.location=${APP_CONFIG_HOME}/ \
+org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplicationKt
index eb450d7..636a554 100644 (file)
@@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu
 import org.slf4j.LoggerFactory
 import org.springframework.security.access.prepost.PreAuthorize
 import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
 
 @Service
 open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration,
@@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
     : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
     private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java)
 
+    private val ph = Phaser(1)
+
     @PreAuthorize("hasRole('USER')")
     override fun process(
             responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
@@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
         return object : StreamObserver<ExecutionServiceInput> {
             override fun onNext(executionServiceInput: ExecutionServiceInput) {
                 try {
+                    ph.register()
                     runBlocking {
                         executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
                     }
                 } catch (e: Exception) {
                     onError(e)
                 }
+                finally {
+                    ph.arriveAndDeregister()
+                }
             }
 
             override fun onError(error: Throwable) {
@@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
             }
         }
     }
+
+    @PreDestroy
+    fun preDestroy() {
+        val name = "BluePrintProcessingGRPCHandler"
+        log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+        ph.arriveAndAwaitAdvance()
+        log.info("Done waiting in $name")
+    }
 }
\ No newline at end of file
index b339903..a9dda7e 100644 (file)
@@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
 import org.springframework.boot.context.event.ApplicationReadyEvent
 import org.springframework.context.event.EventListener
 import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
 import javax.annotation.PreDestroy
 
 @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
@@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer(
 
     val log = logger(BluePrintProcessingKafkaConsumer::class)
 
+    private val ph = Phaser(1)
+
     private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
 
     companion object {
@@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer(
                 channel.consumeEach { message ->
                     launch {
                         try {
+                            ph.register()
                             log.trace("Consumed Message : $message")
                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
@@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer(
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         }
+                        finally {
+                            ph.arriveAndDeregister()
+                        }
                     }
                 }
             }
@@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer(
             log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
                     "message producer($PRODUCER_SELECTOR)...")
             blueprintMessageConsumerService.shutDown()
+            ph.arriveAndAwaitAdvance()
         } catch (e: Exception) {
             log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
         }
index b246b33..130e23e 100644 (file)
@@ -34,6 +34,8 @@ import org.springframework.http.ResponseEntity
 import org.springframework.security.access.prepost.PreAuthorize
 import org.springframework.web.bind.annotation.*
 import reactor.core.publisher.Mono
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
 
 @RestController
 @RequestMapping("/api/v1/execution-service")
@@ -42,6 +44,8 @@ import reactor.core.publisher.Mono
 open class ExecutionServiceController {
     val log = logger(ExecutionServiceController::class)
 
+    private val ph = Phaser(1)
+
     @Autowired
     lateinit var executionServiceHandler: ExecutionServiceHandler
 
@@ -51,7 +55,6 @@ open class ExecutionServiceController {
     @ResponseBody
     @ApiOperation(value = "Health Check", hidden = true)
     fun executionServiceControllerHealthCheck() = monoMdc(Dispatchers.IO) {
-        log.info("Health check success...")
         "Success".asJsonPrimitive()
     }
 
@@ -69,8 +72,19 @@ open class ExecutionServiceController {
         if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
             throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
         }
+
+        ph.register()
         val processResult = executionServiceHandler.doProcess(executionServiceInput)
+        ph.arriveAndDeregister()
         ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code))
     }
+
+    @PreDestroy
+    fun preDestroy() {
+        val name = "ExecutionServiceController"
+        log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+        ph.arriveAndAwaitAdvance()
+        log.info("Done waiting in $name")
+    }
 }