Rolling upgrade support for in-flight requests 26/98426/1
authorSebastien Premont-Tendland <sebastien.premont@bell.ca>
Thu, 31 Oct 2019 19:37:48 +0000 (15:37 -0400)
committerSebastien Premont-Tendland <sebastien.premont@bell.ca>
Thu, 14 Nov 2019 19:52:38 +0000 (14:52 -0500)
Three entry points are being handled :
 1 - REST endpoint
 2 - gRPC endpoint
 3 - Kafka consumer

We make use of Phaser object to make sure the PreDestroy callback wait
for all requests to be executed before stopping the process.

The docker image was also modified to make sure the java process becomes
PID 1 in the container in order to catch the SIGTERM signal which
triggers the PreDestroy callback of Spring. This was done by using
the "exec" command in bash.

Issue-ID: CCSDK-1885
Signed-off-by: Sebastien Premont-Tendland <sebastien.premont@bell.ca>
Change-Id: I64611e569ddb78499aed2375e6186f028d1d8fa0

ms/blueprintsprocessor/distribution/src/main/docker/Dockerfile
ms/blueprintsprocessor/distribution/src/main/docker/startService.sh
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingGRPCHandler.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt

index 207cec5..2a85f1c 100755 (executable)
@@ -1,7 +1,6 @@
 FROM omahoco1/alpine-java-python
 
 # add entrypoint
-COPY run.source /etc/run.source
 COPY startService.sh /startService.sh
 RUN chmod 777 /startService.sh && dos2unix /startService.sh
 
@@ -12,4 +11,4 @@ RUN tar -xzf /source.tar.gz -C /tmp \
  && rm -rf /source.tar.gz \
  && rm -rf /tmp/@project.build.finalName@
 
-ENTRYPOINT /startService.sh
+ENTRYPOINT [ "/startService.sh" ]
index d1e09dd..752b552 100644 (file)
@@ -7,4 +7,15 @@ export APP_HOME=/opt/app/onap
 
 keytool -import -trustcacerts -keystore $JAVA_HOME/jre/lib/security/cacerts -storepass changeit -alias ONAP -import -file $APP_CONFIG_HOME/ONAP_RootCA.cer
 
-source /etc/run.source
+exec java -classpath "/etc:${APP_HOME}/lib/*:/lib/*:/src:/schema:/generated-sources:${APP_CONFIG_HOME}:${APP_HOME}" \
+-DappName=${APPLICATIONNAME} -DappVersion=${BUNDLEVERSION} \
+-DrouteOffer=${ROUTEOFFER} \
+-DVERSION_ROUTEOFFER_ENVCONTEXT=${BUNDLEVERSION}/${STICKYSELECTORKEY}/${ENVCONTEXT} \
+-DSecurityFilePath=/etc \
+-DREST_NAME_NORMALIZER_PATTERN_FILE=/etc/PatternInputs.txt \
+-Dms_name=org.onap.ccsdk.cds.blueprintsprocessor \
+-Dlogging.config=${APP_CONFIG_HOME}/logback.xml \
+-Djava.security.egd=file:/dev/./urandom \
+-DAPPNAME=${APP_NAME} -DAPPENV=${APP_ENV} -DAPPVERSION=${APP_VERSION} -DNAMESPACE=${NAMESPACE} \
+-Dspring.config.location=${APP_CONFIG_HOME}/ \
+org.onap.ccsdk.cds.blueprintsprocessor.BlueprintProcessorApplication
index eb450d7..636a554 100644 (file)
@@ -27,6 +27,8 @@ import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOu
 import org.slf4j.LoggerFactory
 import org.springframework.security.access.prepost.PreAuthorize
 import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
 
 @Service
 open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration: BluePrintCoreConfiguration,
@@ -34,6 +36,8 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
     : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
     private val log = LoggerFactory.getLogger(BluePrintProcessingGRPCHandler::class.java)
 
+    private val ph = Phaser(1)
+
     @PreAuthorize("hasRole('USER')")
     override fun process(
             responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
@@ -41,12 +45,16 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
         return object : StreamObserver<ExecutionServiceInput> {
             override fun onNext(executionServiceInput: ExecutionServiceInput) {
                 try {
+                    ph.register()
                     runBlocking {
                         executionServiceHandler.process(executionServiceInput.toJava(), responseObserver)
                     }
                 } catch (e: Exception) {
                     onError(e)
                 }
+                finally {
+                    ph.arriveAndDeregister()
+                }
             }
 
             override fun onError(error: Throwable) {
@@ -61,4 +69,12 @@ open class BluePrintProcessingGRPCHandler(private val bluePrintCoreConfiguration
             }
         }
     }
+
+    @PreDestroy
+    fun preDestroy() {
+        val name = "BluePrintProcessingGRPCHandler"
+        log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+        ph.arriveAndAwaitAdvance()
+        log.info("Done waiting in $name")
+    }
 }
\ No newline at end of file
index b339903..a9dda7e 100644 (file)
@@ -29,6 +29,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
 import org.springframework.boot.context.event.ApplicationReadyEvent
 import org.springframework.context.event.EventListener
 import org.springframework.stereotype.Service
+import java.util.concurrent.Phaser
 import javax.annotation.PreDestroy
 
 @ConditionalOnProperty(name = ["blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable"],
@@ -40,6 +41,8 @@ open class BluePrintProcessingKafkaConsumer(
 
     val log = logger(BluePrintProcessingKafkaConsumer::class)
 
+    private val ph = Phaser(1)
+
     private lateinit var blueprintMessageConsumerService: BlueprintMessageConsumerService
 
     companion object {
@@ -76,6 +79,7 @@ open class BluePrintProcessingKafkaConsumer(
                 channel.consumeEach { message ->
                     launch {
                         try {
+                            ph.register()
                             log.trace("Consumed Message : $message")
                             val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
@@ -85,6 +89,9 @@ open class BluePrintProcessingKafkaConsumer(
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         }
+                        finally {
+                            ph.arriveAndDeregister()
+                        }
                     }
                 }
             }
@@ -100,6 +107,7 @@ open class BluePrintProcessingKafkaConsumer(
             log.info("Shutting down message consumer($CONSUMER_SELECTOR) and " +
                     "message producer($PRODUCER_SELECTOR)...")
             blueprintMessageConsumerService.shutDown()
+            ph.arriveAndAwaitAdvance()
         } catch (e: Exception) {
             log.error("failed to shutdown message listener($CONSUMER_SELECTOR)", e)
         }
index d48f0c7..cc12296 100644 (file)
@@ -28,12 +28,15 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutp
 import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.http.MediaType
 import org.springframework.http.ResponseEntity
 import org.springframework.http.codec.multipart.FilePart
 import org.springframework.security.access.prepost.PreAuthorize
 import org.springframework.web.bind.annotation.*
+import java.util.concurrent.Phaser
+import javax.annotation.PreDestroy
 
 @RestController
 @RequestMapping("/api/v1/execution-service")
@@ -41,6 +44,10 @@ import org.springframework.web.bind.annotation.*
         description = "Interaction with CBA.")
 open class ExecutionServiceController {
 
+    private val log = logger(ExecutionServiceController::class)
+
+    private val ph = Phaser(1)
+
     @Autowired
     lateinit var executionServiceHandler: ExecutionServiceHandler
 
@@ -90,7 +97,18 @@ open class ExecutionServiceController {
                 if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
                     throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
                 }
+
+                ph.register()
                 val processResult = executionServiceHandler.doProcess(executionServiceInput)
+                ph.arriveAndDeregister()
                 ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code))
             }
+
+    @PreDestroy
+    fun preDestroy() {
+        val name = "ExecutionServiceController"
+        log.info("Starting to shutdown $name waiting for in-flight requests to finish ...")
+        ph.arriveAndAwaitAdvance()
+        log.info("Done waiting in $name")
+    }
 }