Message Prioritization message group lock.
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / AbstractMessagePrioritizeProcessor.kt
index c2965c4..35566ab 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
@@ -29,6 +32,7 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
 
     lateinit var prioritizationConfiguration: PrioritizationConfiguration
     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+    var clusterService: BluePrintClusterService? = null
 
     override fun init(context: ProcessorContext) {
         this.processorContext = context
@@ -36,4 +40,12 @@ abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessa
         this.messagePrioritizationStateService = BluePrintDependencyService
             .messagePrioritizationStateService()
     }
+
+    /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
+    open fun initializeClusterService() {
+        /** Get the Cluster service to update in store */
+        if (BluePrintConstants.CLUSTER_ENABLED) {
+            this.clusterService = BluePrintDependencyService.clusterService()
+        }
+    }
 }