Rest endpoint for message Prioritization 04/100104/1
authorBrinda Santh <bs2796@att.com>
Wed, 8 Jan 2020 17:36:42 +0000 (12:36 -0500)
committerBrinda Santh <bs2796@att.com>
Wed, 8 Jan 2020 17:36:42 +0000 (12:36 -0500)
Refactored to support both Kafka and Rest endpoint prioritization.

Simplified number for Kafka processors.

Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Iba77ed94be3398940840ff01a298f0bec785401f

20 files changed:
ms/blueprintsprocessor/functions/message-prioritizaion/README.md
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractMessagePrioritizeProcessor.kt with 50% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt with 74% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt with 97% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationSerde.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerde.kt with 98% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt with 53% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt with 77% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt [deleted file]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt [deleted file]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
ms/blueprintsprocessor/modules/commons/atomix-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/atomix/BluePrintAtomixLibConfiguration.kt

index 482bbc2..cda43fa 100644 (file)
@@ -3,7 +3,6 @@ To Delete Topics
 ------------------
 kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-input-topic
 kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-output-topic
-kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-expired-topic
 kafka-topics --zookeeper localhost:2181 --delete  --topic test-prioritize-application-PriorityMessage-changelog
 
 Create Topics
@@ -11,7 +10,6 @@ Create Topics
 
 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
 kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
 
 To List topics
 ----------------
@@ -26,6 +24,3 @@ To Listen for Output
 kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
 
 kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
-
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
-
index 28e0963..890e0a6 100644 (file)
@@ -28,9 +28,6 @@ object MessagePrioritizationConstants {
     const val SOURCE_INPUT = "source-prioritization-input"
 
     const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
-    const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
-    const val PROCESSOR_OUTPUT = "processor-prioritization-output"
 
     const val SINK_OUTPUT = "sink-prioritization-output"
-    const val SINK_EXPIRED = "sink-prioritization-expired"
 }
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
new file mode 100644 (file)
index 0000000..584fd00
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+
+interface MessagePrioritizationService {
+
+    fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+
+    suspend fun prioritize(messagePrioritization: MessagePrioritization)
+
+    suspend fun output(id: String)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
new file mode 100644 (file)
index 0000000..5dd41d7
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.Date
+
+interface MessagePrioritizationStateService {
+
+    suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+    suspend fun getMessage(id: String): MessagePrioritization
+
+    suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
+    suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+    suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+        List<MessagePrioritization>?
+
+    suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+        List<MessagePrioritization>?
+
+    suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+    suspend fun getCorrelatedMessages(
+        group: String,
+        states: List<String>,
+        types: List<String>?,
+        correlationIds: String
+    ): List<MessagePrioritization>?
+
+    suspend fun updateMessagesState(ids: List<String>, state: String)
+
+    suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+    suspend fun setMessageState(id: String, state: String)
+
+    suspend fun setMessagesPriority(ids: List<String>, priority: String)
+
+    suspend fun setMessagesState(ids: List<String>, state: String)
+
+    suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+    suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
+
+    suspend fun deleteMessage(id: String)
+
+    suspend fun deleteMessageByGroup(group: String)
+
+    suspend fun deleteMessageStates(group: String, states: List<String>)
+
+    suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
index bef7a7b..05b820a 100644 (file)
@@ -17,7 +17,6 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
index 262dcb4..e90771f 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
 
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
 import org.springframework.http.MediaType
 import org.springframework.web.bind.annotation.GetMapping
@@ -31,7 +32,10 @@ import org.springframework.web.bind.annotation.RestController
 
 @RestController
 @RequestMapping(value = ["/api/v1/message-prioritization"])
-open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+open class MessagePrioritizationApi(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService,
+    private val messagePrioritizationService: MessagePrioritizationService
+) {
 
     @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
     @ResponseBody
@@ -52,6 +56,15 @@ open class MessagePrioritizationApi(private val messagePrioritizationStateServic
         messagePrioritizationStateService.saveMessage(messagePrioritization)
     }
 
+    @PostMapping(
+        path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
+        consumes = [MediaType.APPLICATION_JSON_VALUE]
+    )
+    @ResponseBody
+    fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+        messagePrioritizationService.prioritize(messagePrioritization)
+    }
+
     @PostMapping(
         path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
         consumes = [MediaType.APPLICATION_JSON_VALUE]
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
-/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
 abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
 
     private val log = logger(AbstractMessagePrioritizeProcessor::class)
 
     lateinit var prioritizationConfiguration: PrioritizationConfiguration
-    lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
-    var clusterService: BluePrintClusterService? = null
 
-    override fun init(context: ProcessorContext) {
-        this.processorContext = context
-        /** Get the State service to update in store */
-        this.messagePrioritizationStateService = BluePrintDependencyService
-            .messagePrioritizationStateService()
-    }
-
-    /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
-    open fun initializeClusterService() {
-        /** Get the Cluster service to update in store */
-        if (BluePrintConstants.CLUSTER_ENABLED) {
-            this.clusterService = BluePrintDependencyService.clusterService()
-        }
+    override fun init(processorContext: ProcessorContext) {
+        this.processorContext = processorContext
     }
 }
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
new file mode 100644 (file)
index 0000000..c14a404
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+
+open class DefaultMessagePrioritizeProcessor(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService,
+    private val messagePrioritizationService: MessagePrioritizationService
+) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+    private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+    lateinit var expiryCancellable: Cancellable
+    lateinit var cleanCancellable: Cancellable
+
+    override suspend fun processNB(key: ByteArray, value: ByteArray) {
+
+        val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+            ?: throw BluePrintProcessorException("failed to convert")
+        try {
+            messagePrioritizationService.setKafkaProcessorContext(processorContext)
+            messagePrioritizationService.prioritize(messagePrioritize)
+        } catch (e: Exception) {
+            messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+            log.error(messagePrioritize.error)
+            /** Update the data store */
+            messagePrioritizationStateService.setMessageStateANdError(
+                messagePrioritize.id, MessageState.ERROR.name,
+                messagePrioritize.error!!
+            )
+            /** Publish to Output topic */
+            this.processorContext.forward(
+                messagePrioritize.id, messagePrioritize,
+                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+            )
+        }
+    }
+
+    override fun init(context: ProcessorContext) {
+        super.init(context)
+        /** set up expiry marking cron */
+        initializeExpiryPunctuator()
+        /** Set up cleaning records cron */
+        initializeCleanPunctuator()
+    }
+
+    override fun close() {
+        log.info(
+            "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+                "taskId(${processorContext.taskId()})"
+        )
+        expiryCancellable.cancel()
+        cleanCancellable.cancel()
+    }
+
+    open fun initializeExpiryPunctuator() {
+        val expiryPunctuator =
+            MessagePriorityExpiryPunctuator(
+                messagePrioritizationStateService
+            )
+        expiryPunctuator.processorContext = processorContext
+        expiryPunctuator.configuration = prioritizationConfiguration
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+        expiryCancellable = processorContext.schedule(
+            Duration.ofMillis(expiryConfiguration.frequencyMilli),
+            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+        )
+        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+    }
+
+    open fun initializeCleanPunctuator() {
+        val cleanPunctuator =
+            MessagePriorityCleanPunctuator(
+                messagePrioritizationStateService
+            )
+        cleanPunctuator.processorContext = processorContext
+        cleanPunctuator.configuration = prioritizationConfiguration
+        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+        cleanCancellable = processorContext.schedule(
+            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+        )
+        log.info(
+            "Clean punctuator setup complete with expiry " +
+                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+        )
+    }
+}
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.common.serialization.Serdes
 import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -42,7 +43,7 @@ open class MessagePrioritizationConsumer(
     }
 
     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
-            KafkaStreamConsumerFunction {
+        KafkaStreamConsumerFunction {
         return object : KafkaStreamConsumerFunction {
 
             override suspend fun createTopology(
@@ -52,7 +53,7 @@ open class MessagePrioritizationConsumer(
 
                 val topology = Topology()
                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
-                        as KafkaStreamsBasicAuthConsumerProperties
+                    as KafkaStreamsBasicAuthConsumerProperties
 
                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
                 log.info("Consuming prioritization topics($topics)")
@@ -68,39 +69,12 @@ open class MessagePrioritizationConsumer(
                     MessagePrioritizationConstants.SOURCE_INPUT
                 )
 
-                topology.addProcessor(
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                    bluePrintProcessorSupplier<String, String>(
-                        MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                        prioritizationConfiguration
-                    ),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
-                )
-
-                topology.addProcessor(
-                    MessagePrioritizationConstants.PROCESSOR_OUTPUT,
-                    bluePrintProcessorSupplier<String, String>(
-                        MessagePrioritizationConstants.PROCESSOR_OUTPUT,
-                        prioritizationConfiguration
-                    ),
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE
-                )
-
-                topology.addSink(
-                    MessagePrioritizationConstants.SINK_EXPIRED,
-                    prioritizationConfiguration.expiredTopic,
-                    Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
-                )
-
                 /** To receive completed and error messages */
                 topology.addSink(
                     MessagePrioritizationConstants.SINK_OUTPUT,
                     prioritizationConfiguration.outputTopic,
                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                    MessagePrioritizationConstants.PROCESSOR_OUTPUT
+                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
                 )
 
                 // Output will be sent to the group-output topic from Processor API
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.streams.processor.To
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
@@ -46,7 +46,7 @@ class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateServ
             fetchMessages.forEach { expired ->
                 processorContext.forward(
                     expired.id, expired,
-                    To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+                    To.child(MessagePrioritizationConstants.SINK_OUTPUT)
                 )
             }
         }
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.serialization.Serde
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
-import org.apache.kafka.streams.processor.Cancellable
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
 import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
 
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
 
-    private val log = logger(MessagePrioritizeProcessor::class)
+    private val log = logger(AbstractMessagePrioritizationService::class)
 
-    lateinit var expiryCancellable: Cancellable
-    lateinit var cleanCancellable: Cancellable
+    var processorContext: ProcessorContext? = null
 
-    override suspend fun processNB(key: ByteArray, value: ByteArray) {
-        log.info("***** received in prioritize processor key(${String(key)})")
-        val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
-            ?: throw BluePrintProcessorException("failed to convert")
+    override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+        this.processorContext = processorContext
+    }
+
+    override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
         try {
+            log.info("***** received in prioritize processor key(${messagePrioritize.id})")
             /** Get the cluster lock for message group */
-            val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+            val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
             // Save the Message
             messagePrioritizationStateService.saveMessage(messagePrioritize)
             handleCorrelationAndNextStep(messagePrioritize)
             /** Cluster unLock for message group */
-            MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+            MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -59,58 +58,16 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
                 messagePrioritize.id, MessageState.ERROR.name,
                 messagePrioritize.error!!
             )
-            /** Publish to Output topic */
-            this.processorContext.forward(
-                messagePrioritize.id, messagePrioritize,
-                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-            )
         }
     }
 
-    override fun init(context: ProcessorContext) {
-        super.init(context)
-        /** set up expiry marking cron */
-        initializeExpiryPunctuator()
-        /** Set up cleaning records cron */
-        initializeCleanPunctuator()
-        /** Set up Cluster Service */
-        initializeClusterService()
-    }
-
-    override fun close() {
-        log.info(
-            "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
-                "taskId(${processorContext.taskId()})"
-        )
-        expiryCancellable.cancel()
-        cleanCancellable.cancel()
-    }
-
-    open fun initializeExpiryPunctuator() {
-        val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
-        expiryPunctuator.processorContext = processorContext
-        expiryPunctuator.configuration = prioritizationConfiguration
-        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
-        expiryCancellable = processorContext.schedule(
-            Duration.ofMillis(expiryConfiguration.frequencyMilli),
-            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
-        )
-        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
-    }
-
-    open fun initializeCleanPunctuator() {
-        val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
-        cleanPunctuator.processorContext = processorContext
-        cleanPunctuator.configuration = prioritizationConfiguration
-        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
-        cleanCancellable = processorContext.schedule(
-            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
-            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
-        )
-        log.info(
-            "Clean punctuator setup complete with expiry " +
-                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
-        )
+    override suspend fun output(id: String) {
+        log.info("$$$$$ received in output processor id($id)")
+        val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+        /** Check for Kafka Processing, If yes, then send to the output topic */
+        if (this.processorContext != null) {
+            processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+        }
     }
 
     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -126,10 +83,11 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
             )
 
             /** Get all previously received messages from database for group and optional types and correlation Id */
-            val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
-                group,
-                arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
-            )
+            val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+                .getCorrelatedMessages(
+                    group,
+                    arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+                )
 
             /** If multiple records found, then check correlation */
             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
@@ -139,12 +97,9 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
 
                 if (correlationResults.correlated) {
                     /** Correlation  satisfied */
-                    val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
-                    /**  Send only correlated ids to next processor */
-                    this.processorContext.forward(
-                        UUID.randomUUID().toString(), correlatedIds,
-                        To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-                    )
+                    val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+                    /**  Send only correlated ids to aggregate processor */
+                    aggregate(correlatedIds)
                 } else {
                     /** Correlation not satisfied */
                     log.trace("correlation not matched : ${correlationResults.message}")
@@ -159,16 +114,57 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         } else {
             // No Correlation check needed, simply forward to next processor.
             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
-            this.processorContext.forward(
-                messagePrioritization.id, messagePrioritization.id,
-                To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-            )
+            aggregate(messagePrioritization.id)
+        }
+    }
+
+    open suspend fun aggregate(strIds: String) {
+        log.info("@@@@@ received in aggregation processor ids($strIds)")
+        val ids = strIds.split(",").map { it.trim() }
+        if (!ids.isNullOrEmpty()) {
+            try {
+                if (ids.size == 1) {
+                    /** No aggregation or sequencing needed, simpley forward to next processor */
+                    output(ids.first())
+                } else {
+                    /** Implement Aggregation logic in overridden class, If necessary,
+                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                    handleAggregation(ids)
+                    /** Update all messages to Aggregated state */
+                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+                }
+            } catch (e: Exception) {
+                val error = "failed in Aggregate message($ids) : ${e.message}"
+                log.error(error, e)
+                val storeMessages = messagePrioritizationStateService.getMessages(ids)
+                if (!storeMessages.isNullOrEmpty()) {
+                    storeMessages.forEach { messagePrioritization ->
+                        try {
+                            /** Update the data store */
+                            messagePrioritizationStateService.setMessageStateANdError(
+                                messagePrioritization.id,
+                                MessageState.ERROR.name, error
+                            )
+                            /** Publish to output topic */
+                            output(messagePrioritization.id)
+                        } catch (sendException: Exception) {
+                            log.error(
+                                "failed to update/publish error message(${messagePrioritization.id}) : " +
+                                    "${sendException.message}", e
+                            )
+                        }
+                    }
+                }
+            }
         }
     }
 
+    /** Child will override this implementation , if necessary
+     *  Here the place child has to implement custom Sequencing and Aggregation logic.
+     * */
+    abstract suspend fun handleAggregation(messageIds: List<String>)
+
     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
      * otherwise correlation happens with group and correlationId */
-    open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
-        return null
-    }
+    abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
 }
@@ -16,6 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
@@ -27,59 +28,10 @@ import org.springframework.stereotype.Service
 import org.springframework.transaction.annotation.Transactional
 import java.util.Date
 
-interface MessagePrioritizationStateService {
-
-    suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
-
-    suspend fun getMessage(id: String): MessagePrioritization
-
-    suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
-
-    suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
-
-    suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
-            List<MessagePrioritization>?
-
-    suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
-            List<MessagePrioritization>?
-
-    suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
-
-    suspend fun getCorrelatedMessages(
-        group: String,
-        states: List<String>,
-        types: List<String>?,
-        correlationIds: String
-    ): List<MessagePrioritization>?
-
-    suspend fun updateMessagesState(ids: List<String>, state: String)
-
-    suspend fun updateMessageState(id: String, state: String): MessagePrioritization
-
-    suspend fun setMessageState(id: String, state: String)
-
-    suspend fun setMessagesPriority(ids: List<String>, priority: String)
-
-    suspend fun setMessagesState(ids: List<String>, state: String)
-
-    suspend fun setMessageStateANdError(id: String, state: String, error: String)
-
-    suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
-
-    suspend fun deleteMessage(id: String)
-
-    suspend fun deleteMessageByGroup(group: String)
-
-    suspend fun deleteMessageStates(group: String, states: List<String>)
-
-    suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
-}
-
 @Service
 open class MessagePrioritizationStateServiceImpl(
     private val prioritizationMessageRepository: PrioritizationMessageRepository
-) :
-    MessagePrioritizationStateService {
+) : MessagePrioritizationStateService {
 
     private val log = logger(MessagePrioritizationStateServiceImpl::class)
 
@@ -110,7 +62,7 @@ open class MessagePrioritizationStateServiceImpl(
     }
 
     override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
-            List<MessagePrioritization>? {
+        List<MessagePrioritization>? {
         return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
             group,
             states, Date(), PageRequest.of(0, count)
@@ -118,7 +70,7 @@ open class MessagePrioritizationStateServiceImpl(
     }
 
     override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
-            List<MessagePrioritization>? {
+        List<MessagePrioritization>? {
         return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
             group,
             states, Date(), PageRequest.of(0, count)
@@ -126,7 +78,7 @@ open class MessagePrioritizationStateServiceImpl(
     }
 
     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
-            List<MessagePrioritization>? {
+        List<MessagePrioritization>? {
         return prioritizationMessageRepository.findByByGroupAndExpiredDate(
             group,
             expiryDate, PageRequest.of(0, count)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
new file mode 100644 (file)
index 0000000..fcdb71c
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+    AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+    private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+    /** Child overriding this implementation , if necessary */
+    override suspend fun handleAggregation(messageIds: List<String>) {
+        log.info("messages($messageIds) aggregated")
+        messageIds.forEach { id ->
+            output(id)
+        }
+    }
+
+    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+     * otherwise correlation happens with group and correlationId */
+    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        return when (messagePrioritization.group) {
+            /** Dummy Implementation, This can also be read from file and stored as cached map **/
+            "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+            else -> null
+        }
+    }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
deleted file mode 100644 (file)
index 3e697e6..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
-    private val log = logger(MessageAggregateProcessor::class)
-
-    override suspend fun processNB(key: String, value: String) {
-
-        log.info("@@@@@ received in aggregation processor key($key), value($value)")
-        val ids = value.split(",").map { it.trim() }
-        if (!ids.isNullOrEmpty()) {
-            try {
-                if (ids.size == 1) {
-                    processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
-                } else {
-                    /** Implement Aggregation logic in overridden class, If necessary,
-                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
-                    handleAggregation(ids)
-                    /** Update all messages to Aggregated state */
-                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
-                }
-            } catch (e: Exception) {
-                val error = "failed in Aggregate message($ids) : ${e.message}"
-                log.error(error, e)
-                val storeMessages = messagePrioritizationStateService.getMessages(ids)
-                if (!storeMessages.isNullOrEmpty()) {
-                    storeMessages.forEach { messagePrioritization ->
-                        try {
-                            /** Update the data store */
-                            messagePrioritizationStateService.setMessageStateANdError(
-                                messagePrioritization.id,
-                                MessageState.ERROR.name, error
-                            )
-                            /** Publish to Error topic */
-                            this.processorContext.forward(
-                                messagePrioritization.id, messagePrioritization,
-                                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-                            )
-                        } catch (sendException: Exception) {
-                            log.error(
-                                "failed to update/publish error message(${messagePrioritization.id}) : " +
-                                        "${sendException.message}", e
-                            )
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /** Child will override this implementation , if necessary */
-    open suspend fun handleAggregation(messageIds: List<String>) {
-        log.info("messages($messageIds) aggregated")
-        messageIds.forEach { id ->
-            processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
-        }
-    }
-}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
deleted file mode 100644 (file)
index cf6520d..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
-    private val log = logger(MessageOutputProcessor::class)
-
-    override suspend fun processNB(key: String, value: String) {
-        log.info("$$$$$ received in output processor key($key), value($value)")
-        val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
-        processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
-    }
-}
index 49230b6..186499d 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
 
 import org.apache.kafka.streams.processor.ProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
 object MessageProcessorUtils {
 
-    /** Utility to create the cluster lock for message [messagePrioritization] */
-    suspend fun prioritizationGrouplock(
-        clusterService: BluePrintClusterService?,
-        messagePrioritization: MessagePrioritization
-    ): ClusterLock? {
+    /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
+    suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
+        val clusterService = BluePrintDependencyService.optionalClusterService()
+
         return if (clusterService != null && clusterService.clusterJoined() &&
             !messagePrioritization.correlationId.isNullOrBlank()
         ) {
             // Get the correlation key in ascending order, even it it is misplaced
             val correlationId = messagePrioritization.toFormatedCorrelation()
-            val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
+            val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
             val clusterLock = clusterService.clusterLock(lockName)
             clusterLock.lock()
             if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
@@ -46,14 +45,15 @@ object MessageProcessorUtils {
         } else null
     }
 
-    /** Utility used to cluster unlock for message [messagePrioritization] */
-    suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
-        if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+    /** Utility used to cluster unlock for message [clusterLock] */
+    suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+        if (clusterLock != null) {
             clusterLock.unLock()
             clusterLock.close()
         }
     }
 
+    /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
     fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
         ProcessorSupplier<K, V> {
         return ProcessorSupplier<K, V> {
index f9e23e8..ec0515c 100644 (file)
@@ -27,12 +27,13 @@ import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
@@ -43,6 +44,7 @@ import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.Test
 import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
 @DataJpaTest
@@ -72,6 +74,8 @@ import kotlin.test.assertNotNull
 )
 open class MessagePrioritizationConsumerTest {
 
+    private val log = logger(MessagePrioritizationConsumerTest::class)
+
     @Autowired
     lateinit var applicationContext: ApplicationContext
 
@@ -81,6 +85,9 @@ open class MessagePrioritizationConsumerTest {
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
+    @Autowired
+    lateinit var messagePrioritizationService: MessagePrioritizationService
+
     @Autowired
     lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
 
@@ -106,6 +113,38 @@ open class MessagePrioritizationConsumerTest {
         }
     }
 
+    @Test
+    fun testMessagePrioritizationService() {
+        runBlocking {
+            assertTrue(
+                ::messagePrioritizationService.isInitialized,
+                "failed to initialize messagePrioritizationService"
+            )
+
+            log.info("****************  without Correlation **************")
+            /** Checking without correlation */
+            MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                messagePrioritizationService.prioritize(it)
+            }
+            log.info("****************  Same Group , with Correlation **************")
+            /** checking same group with correlation */
+            MessagePrioritizationSample
+                .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                .forEach {
+                    delay(10)
+                    messagePrioritizationService.prioritize(it)
+                }
+            log.info("****************  Different Type , with Correlation **************")
+            /** checking different type, with correlation */
+            MessagePrioritizationSample
+                .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                .forEach {
+                    delay(10)
+                    messagePrioritizationService.prioritize(it)
+                }
+        }
+    }
+
     @Test
     fun testStartConsuming() {
         runBlocking {
@@ -118,7 +157,9 @@ open class MessagePrioritizationConsumerTest {
             val spyStreamingConsumerService = spyk(streamingConsumerService)
             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+            val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService
+            )
             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
 
             // Test Topology
index 3d3d0c6..0285079 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.Bean
@@ -65,22 +64,17 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
  */
 
 @Service
-open class SampleMessagePrioritizationConsumer(
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+    SampleMessagePrioritizationService(messagePrioritizationStateService)
+
+/** For Kafka Consumer  **/
+@Service
+open class TestMessagePrioritizationConsumer(
     bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 ) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
 
 @Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
-    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
-        return when (messagePrioritization.group) {
-            "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
-            else -> null
-        }
-    }
-}
-
-@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
-
-@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class SampleMessageOutputProcessor : MessageOutputProcessor()
+open class TestMessagePrioritizeProcessor(
+    messagePrioritizationStateService: MessagePrioritizationStateService,
+    messagePrioritizationService: MessagePrioritizationService
+) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)
index 8ea1593..8ef2903 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.atomix
 
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.context.annotation.Configuration
 
@@ -29,3 +30,10 @@ open class BluePrintAtomixLibConfiguration
  */
 fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
     instance(AtomixBluePrintClusterService::class)
+
+/** Optional Cluster Service, returns only if Cluster is enabled */
+fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
+    return if (BluePrintConstants.CLUSTER_ENABLED) {
+        BluePrintDependencyService.clusterService()
+    } else null
+}