Add message prioritization module 67/98267/4
authorBrinda Santh <bs2796@att.com>
Tue, 12 Nov 2019 00:35:39 +0000 (19:35 -0500)
committerBrinda Santh <bs2796@att.com>
Wed, 13 Nov 2019 20:18:32 +0000 (15:18 -0500)
Kafka streams based solution for message prioritization using  database store.

Implement initial Abstract Processors, Puntuations and sample Topology for easy plug and play based on situations

Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I9c135604574cc3c642186545e076d6a7c60048d4

25 files changed:
ms/blueprintsprocessor/functions/message-prioritizaion/README.txt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml [new file with mode: 0644]
ms/blueprintsprocessor/functions/pom.xml
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt

diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt b/ms/blueprintsprocessor/functions/message-prioritizaion/README.txt
new file mode 100644 (file)
index 0000000..baf1687
--- /dev/null
@@ -0,0 +1,28 @@
+
+To Delete Topics
+------------------
+kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-input-topic
+kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-output-topic
+kafka-topics --zookeeper localhost:2181 --delete  --topic prioritize-expired-topic
+kafka-topics --zookeeper localhost:2181 --delete  --topic test-prioritize-application-PriorityMessage-changelog
+
+Create Topics
+--------------
+
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
+kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
+
+To List topics
+----------------
+kafka-topics --list --bootstrap-server localhost:9092
+
+
+To Listen for Output
+----------------------
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
+
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
+
+kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/pom.xml
new file mode 100644 (file)
index 0000000..ac46b36
--- /dev/null
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright © 2018-2019 AT&T Intellectual Property.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+        <artifactId>functions</artifactId>
+        <version>0.7.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.onap.ccsdk.cds.blueprintsprocessor.functions</groupId>
+    <artifactId>message-prioritizaion</artifactId>
+
+    <name>Blueprints Processor Function - Message Prioritization</name>
+    <description>Blueprints Processor Function - Message Prioritization</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
+            <artifactId>message-lib</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
new file mode 100644 (file)
index 0000000..d89f713
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
+
+    private val log = logger(AbstractMessagePrioritizeProcessor::class)
+
+    lateinit var prioritizationConfiguration: PrioritizationConfiguration
+    lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
+
+    override fun init(context: ProcessorContext) {
+        this.processorContext = context
+        /** Get the State service to update in store */
+        this.messagePrioritizationStateService = BluePrintDependencyService
+                .instance(MessagePrioritizationStateService::class)
+
+    }
+
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConfiguration.kt
new file mode 100644 (file)
index 0000000..cce883c
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+
+@Configuration
+@ComponentScan
+open class MessagePrioritizationConfiguration
+
+
+object MessagePrioritizationConstants {
+
+    const val SOURCE_INPUT = "source-prioritization-input"
+
+    const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
+    const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
+    const val PROCESSOR_OUTPUT = "processor-prioritization-output"
+
+    const val SINK_OUTPUT = "sink-prioritization-output"
+    const val SINK_EXPIRED = "sink-prioritization-expired"
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
new file mode 100644 (file)
index 0000000..ef9d5a0
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class MessagePrioritizationConsumer(
+        private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) {
+
+    private val log = logger(MessagePrioritizationConsumer::class)
+
+    lateinit var streamingConsumerService: BlueprintMessageConsumerService
+
+    open fun consumerService(selector: String): BlueprintMessageConsumerService {
+        return bluePrintMessageLibPropertyService
+                .blueprintMessageConsumerService(selector)
+    }
+
+    open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration)
+            : KafkaStreamConsumerFunction {
+        return object : KafkaStreamConsumerFunction {
+
+            override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+                                                additionalConfig: Map<String, Any>?): Topology {
+
+                val topology = Topology()
+                val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+                        as KafkaStreamsBasicAuthConsumerProperties
+
+                val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+                log.info("Consuming prioritization topics($topics)")
+
+                topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
+
+                topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+                        bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+                                prioritizationConfiguration),
+                        MessagePrioritizationConstants.SOURCE_INPUT)
+
+                topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+                        bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
+                                prioritizationConfiguration),
+                        MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+
+                topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+                        bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
+                                prioritizationConfiguration),
+                        MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+
+                topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED,
+                        prioritizationConfiguration.expiredTopic,
+                        Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+                        MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+
+                topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
+                        prioritizationConfiguration.outputTopic,
+                        Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+                        MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+
+                // Output will be sent to the group-output topic from Processor API
+                return topology
+            }
+        }
+    }
+
+    suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
+        streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+        // Dynamic Consumer Function to create Topology
+        val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
+        streamingConsumerService.consume(null, consumerFunction)
+    }
+
+    suspend fun shutDown() {
+        if (streamingConsumerService != null) {
+            streamingConsumerService.shutDown()
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
new file mode 100644 (file)
index 0000000..d874cef
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import java.io.Serializable
+
+object MessageActionConstants {
+    const val PRIORITIZE = "prioritize"
+}
+
+enum class MessageState(val id: String) {
+    NEW("new"),
+    WAIT("wait"),
+    EXPIRED("expired"),
+    PRIORITIZED("prioritized"),
+    AGGREGATED("aggregated"),
+    IGNORED("ignored"),
+    COMPLETED("completed"),
+}
+
+open class PrioritizationConfiguration : Serializable {
+    lateinit var expiryConfiguration: ExpiryConfiguration
+    lateinit var shutDownConfiguration: ShutDownConfiguration
+    lateinit var cleanConfiguration: CleanConfiguration
+    lateinit var inputTopicSelector: String // Consumer Configuration Selector
+    lateinit var expiredTopic: String // Publish Configuration Selector
+    lateinit var outputTopic: String // Publish Configuration Selector
+}
+
+open class ExpiryConfiguration : Serializable {
+    var frequencyMilli: Long = 30000L
+    var maxPollRecord: Int = 1000
+}
+
+open class ShutDownConfiguration : Serializable {
+    var waitMill: Long = 30000L
+}
+
+open class CleanConfiguration : Serializable {
+    var frequencyMilli: Long = 30000L
+    var expiredRecordsHoldDays: Int = 5
+}
+
+open class UpdateStateRequest : Serializable {
+    lateinit var id: String
+    var group: String? = null
+    var state: String? = null
+    var notifyMessage: String? = null
+}
+
+data class CorrelationCheckResponse(var message: String? = null,
+                                    var correlated: Boolean = false)
+
+data class TypeCorrelationKey(val type: String, val correlationId: String)
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
new file mode 100644 (file)
index 0000000..94fedf4
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+
+fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
+        : ProcessorSupplier<K, V> {
+    return ProcessorSupplier<K, V> {
+        // Dynamically resolve the Prioritization Processor
+        val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+        processorInstance.prioritizationConfiguration = prioritizationConfiguration
+        processorInstance
+    }
+}
+
+fun MessagePrioritization.toFormatedCorrelation(): String {
+    val ascendingKey = this.correlationId!!.split(",")
+            .map { it.trim() }.sorted().joinToString(",")
+    return ascendingKey
+}
+
+fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
+    val ascendingKey = this.correlationId!!.split(",")
+            .map { it.trim() }.sorted().joinToString(",")
+    return TypeCorrelationKey(this.type, ascendingKey)
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
new file mode 100644 (file)
index 0000000..307d932
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import org.springframework.data.domain.Pageable
+import org.springframework.data.jpa.repository.JpaRepository
+import org.springframework.data.jpa.repository.Modifying
+import org.springframework.data.jpa.repository.Query
+import org.springframework.stereotype.Repository
+import org.springframework.transaction.annotation.Transactional
+import java.util.*
+
+@Repository
+@Transactional(readOnly = true)
+interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization, String> {
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group ORDER BY pm.createdDate asc")
+    fun findByGroup(group: String, count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "ORDER BY pm.createdDate asc")
+    fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "ORDER BY pm.updatedDate asc")
+    fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable)
+            : List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc")
+    fun findByGroupAndStateInAndNotExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
+                                               count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.state in :states " +
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+    fun findByStateInAndExpiredDate(states: List<String>, expiryCheckDate: Date,
+                                    count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+    fun findByGroupAndStateInAndExpiredDate(group: String, states: List<String>, expiryCheckDate: Date,
+                                            count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group " +
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc")
+    fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
+    fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String)
+            : List<MessagePrioritization>?
+
+    @Query("FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
+            "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc")
+    fun findByGroupAndTypesAndCorrelationId(group: String, states: List<String>, types: List<String>,
+                                            correlationId: String): List<MessagePrioritization>?
+
+    @Modifying
+    @Transactional
+    @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id")
+    fun setStatusForMessageId(id: String, state: String): Int
+
+    @Modifying
+    @Transactional
+    @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids")
+    fun setStatusForMessageIds(ids: List<String>, state: String): Int
+
+    @Modifying
+    @Transactional
+    @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " +
+            "WHERE pm.id = :id")
+    fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int
+
+    @Modifying
+    @Transactional
+    @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
+    fun deleteGroup(group: String)
+
+    @Modifying
+    @Transactional
+    @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
+    fun deleteGroupAndStateIn(group: String, states: List<String>)
+}
+
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
new file mode 100644 (file)
index 0000000..4973cdf
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db
+
+import com.fasterxml.jackson.annotation.JsonFormat
+import org.hibernate.annotations.Proxy
+import org.springframework.data.annotation.LastModifiedDate
+import org.springframework.data.jpa.domain.support.AuditingEntityListener
+import org.springframework.data.jpa.repository.config.EnableJpaAuditing
+import java.util.*
+import javax.persistence.*
+
+@EnableJpaAuditing
+@EntityListeners(AuditingEntityListener::class)
+@Entity
+@Table(name = "MESSAGE_PRIORITIZATION")
+@Proxy(lazy = false)
+open class MessagePrioritization {
+    @Id
+    @Column(name = "message_id", length = 50)
+    lateinit var id: String
+
+    @Column(name = "message_group", length = 50, nullable = false)
+    lateinit var group: String
+
+    @Column(name = "message_type", length = 50, nullable = false)
+    lateinit var type: String
+
+    /** States Defined by MessageState */
+    @Column(name = "message_state", length = 20, nullable = false)
+    lateinit var state: String
+
+    @Lob
+    @Column(name = "message", nullable = false)
+    var message: String? = null
+
+    @Lob
+    @Column(name = "aggregated_message_ids", nullable = true)
+    var aggregatedMessageIds: String? = null
+
+    @Lob
+    @Column(name = "correlation_id", nullable = true)
+    var correlationId: String? = null
+
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+    @Temporal(TemporalType.TIMESTAMP)
+    @Column(name = "created_date", nullable = false)
+    var createdDate = Date()
+
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+    @LastModifiedDate
+    @Temporal(TemporalType.TIMESTAMP)
+    @Column(name = "updated_date", nullable = false)
+    var updatedDate: Date? = null
+
+    @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+    @Temporal(TemporalType.TIMESTAMP)
+    @Column(name = "expiry_date", nullable = false)
+    var expiryDate: Date? = null
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
new file mode 100644 (file)
index 0000000..e4369fc
--- /dev/null
@@ -0,0 +1,172 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.data.domain.PageRequest
+import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
+import java.util.*
+
+interface MessagePrioritizationStateService {
+
+    suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+    suspend fun getMessage(id: String): MessagePrioritization
+
+    suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+    suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+
+    suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+
+    suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+    suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
+
+    suspend fun updateMessagesState(ids: List<String>, state: String)
+
+    suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+    suspend fun setMessageState(id: String, state: String)
+
+    suspend fun setMessagesState(ids: List<String>, state: String)
+
+    suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
+
+    suspend fun deleteMessage(id: String)
+
+    suspend fun deleteMessageByGroup(group: String)
+
+    suspend fun deleteMessageStates(group: String, states: List<String>)
+
+    suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
+
+@Service
+open class MessagePrioritizationStateServiceImpl(
+        private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
+
+    private val log = logger(MessagePrioritizationStateServiceImpl::class)
+
+    @Transactional
+    override suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization {
+        if (!message.correlationId.isNullOrBlank()) {
+            message.correlationId = message.toFormatedCorrelation()
+        }
+        message.updatedDate = Date()
+        return prioritizationMessageRepository.save(message)
+    }
+
+    override suspend fun getMessage(id: String): MessagePrioritization {
+        return prioritizationMessageRepository.findById(id).orElseGet(null)
+                ?: throw BluePrintProcessorException("couldn't find message for id($id)")
+    }
+
+    override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
+        return prioritizationMessageRepository
+                .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
+                        Date(), PageRequest.of(0, count))
+    }
+
+    override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
+            : List<MessagePrioritization>? {
+        return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(group,
+                states, Date(), PageRequest.of(0, count))
+    }
+
+    override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
+            : List<MessagePrioritization>? {
+        return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(group,
+                states, Date(), PageRequest.of(0, count))
+    }
+
+    override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int)
+            : List<MessagePrioritization>? {
+        return prioritizationMessageRepository.findByByGroupAndExpiredDate(group,
+                expiryDate, PageRequest.of(0, count))
+    }
+
+    override suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
+                                               correlationIds: String): List<MessagePrioritization>? {
+        return if (!types.isNullOrEmpty()) {
+            prioritizationMessageRepository.findByGroupAndTypesAndCorrelationId(group, states, types, correlationIds)
+        } else {
+            prioritizationMessageRepository.findByGroupAndCorrelationId(group, states, correlationIds)
+        }
+    }
+
+    override suspend fun updateMessagesState(ids: List<String>, state: String) {
+        ids.forEach {
+            val updated = updateMessageState(it, state)
+            log.info("message($it) update to state(${updated.state})")
+        }
+    }
+
+    @Transactional
+    override suspend fun setMessageState(id: String, state: String) {
+        prioritizationMessageRepository.setStatusForMessageId(id, state)
+    }
+
+    @Transactional
+    override suspend fun setMessagesState(ids: List<String>, state: String) {
+        prioritizationMessageRepository.setStatusForMessageIds(ids, state)
+    }
+
+    @Transactional
+    override suspend fun updateMessageState(id: String, state: String): MessagePrioritization {
+        val updateMessage = getMessage(id).apply {
+            this.updatedDate = Date()
+            this.state = state
+        }
+        return saveMessage(updateMessage)
+    }
+
+    override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
+            : MessagePrioritization {
+
+        val groupedIds = groupedMessageIds.joinToString(",")
+        val updateMessage = getMessage(id).apply {
+            this.updatedDate = Date()
+            this.state = state
+            this.aggregatedMessageIds = groupedIds
+        }
+        return saveMessage(updateMessage)
+    }
+
+    override suspend fun deleteMessage(id: String) {
+        return prioritizationMessageRepository.deleteById(id)
+    }
+
+    override suspend fun deleteMessageByGroup(group: String) {
+        return prioritizationMessageRepository.deleteGroup(group)
+    }
+
+    override suspend fun deleteMessageStates(group: String, states: List<String>) {
+        return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+    }
+
+    override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
+        return prioritizationMessageRepository.deleteGroupAndStateIn(group,
+                arrayListOf(MessageState.EXPIRED.name))
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
new file mode 100644 (file)
index 0000000..8dd4019
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
+
+    private val log = logger(MessageAggregateProcessor::class)
+
+    override suspend fun processNB(key: String, value: String) {
+
+        log.info("@@@@@ received in aggregation processor key($key), value($value)")
+        val ids = value.split(",").map { it.trim() }
+        if (!ids.isNullOrEmpty()) {
+            if (ids.size == 1) {
+                processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+            } else {
+                /** Implement Aggregation logic in overridden class, If necessary,
+                 Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                handleAggregation(ids)
+                /** Update all messages to Aggregated state */
+                messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+            }
+        }
+    }
+
+    /** Child will override this implementation , if necessary */
+    open suspend fun handleAggregation(messageIds: List<String>) {
+        log.info("messages($messageIds) aggregated")
+        messageIds.forEach { id ->
+            processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageOutputProcessor.kt
new file mode 100644 (file)
index 0000000..34faa1b
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
+
+    private val log = logger(MessageOutputProcessor::class)
+
+    override suspend fun processNB(key: String, value: String) {
+        log.info("$$$$$ received in output processor key($key), value($value)")
+        val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
+        processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationPunctuators.kt
new file mode 100644 (file)
index 0000000..a745e03
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+
+class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
+    : AbstractBluePrintMessagePunctuator() {
+
+    private val log = logger(MessagePriorityExpiryPunctuator::class)
+    lateinit var configuration: PrioritizationConfiguration
+
+    override suspend fun punctuateNB(timestamp: Long) {
+
+        log.info("**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
+                "taskId(${processorContext.taskId()})")
+        val expiryConfiguration = configuration.expiryConfiguration
+        val fetchMessages = messagePrioritizationStateService
+                .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+
+        val expiredIds = fetchMessages?.map { it.id }
+        if (expiredIds != null && expiredIds.isNotEmpty()) {
+            messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+            fetchMessages.forEach { expired ->
+                processorContext.forward(expired.id, expired,
+                        To.child(MessagePrioritizationConstants.SINK_EXPIRED))
+            }
+        }
+    }
+}
+
+class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService)
+    : AbstractBluePrintMessagePunctuator() {
+
+    private val log = logger(MessagePriorityCleanPunctuator::class)
+    lateinit var configuration: PrioritizationConfiguration
+
+    override suspend fun punctuateNB(timestamp: Long) {
+        log.info("**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
+                "taskId(${processorContext.taskId()})")
+        //TODO
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizationSerdes.kt
new file mode 100644 (file)
index 0000000..00d4547
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.nio.charset.Charset
+
+open class MessagePrioritizationSerde : Serde<MessagePrioritization> {
+
+    override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+    }
+
+    override fun close() {
+    }
+
+    override fun deserializer(): Deserializer<MessagePrioritization> {
+        return object : Deserializer<MessagePrioritization> {
+            override fun deserialize(topic: String, data: ByteArray): MessagePrioritization {
+                return JacksonUtils.readValue(String(data), MessagePrioritization::class.java)
+                        ?: throw BluePrintProcessorException("failed to convert")
+            }
+
+            override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+            }
+
+            override fun close() {
+            }
+        }
+    }
+
+    override fun serializer(): Serializer<MessagePrioritization> {
+        return object : Serializer<MessagePrioritization> {
+            override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+            }
+
+            override fun serialize(topic: String?, data: MessagePrioritization): ByteArray {
+                return data.asJsonString().toByteArray(Charset.defaultCharset())
+            }
+
+            override fun close() {
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
new file mode 100644 (file)
index 0000000..5a5aa25
--- /dev/null
@@ -0,0 +1,138 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+import java.util.*
+
+
+open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+    private val log = logger(MessagePrioritizeProcessor::class)
+
+    lateinit var expiryCancellable: Cancellable
+    lateinit var cleanCancellable: Cancellable
+
+    override suspend fun processNB(key: ByteArray, value: ByteArray) {
+        log.info("***** received in prioritize processor key(${String(key)})")
+        val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+                ?: throw BluePrintProcessorException("failed to convert")
+        // Save the Message
+        messagePrioritizationStateService.saveMessage(data)
+        handleCorrelationAndNextStep(data)
+
+    }
+
+    override fun init(context: ProcessorContext) {
+        super.init(context)
+        /** set up expiry marking cron */
+        initializeExpiryPunctuator()
+        /** Set up cleaning records cron */
+        initializeCleanPunctuator()
+    }
+
+    override fun close() {
+        log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+                "taskId(${processorContext.taskId()})")
+        expiryCancellable.cancel()
+        cleanCancellable.cancel()
+    }
+
+    open fun initializeExpiryPunctuator() {
+        val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
+        expiryPunctuator.processorContext = processorContext
+        expiryPunctuator.configuration = prioritizationConfiguration
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+        expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli),
+                PunctuationType.WALL_CLOCK_TIME, expiryPunctuator)
+        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+    }
+
+    open fun initializeCleanPunctuator() {
+        val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
+        cleanPunctuator.processorContext = processorContext
+        cleanPunctuator.configuration = prioritizationConfiguration
+        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+        cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+                PunctuationType.WALL_CLOCK_TIME, cleanPunctuator)
+        log.info("Clean punctuator setup complete with expiry " +
+                "hold(${cleanConfiguration.expiredRecordsHoldDays})days")
+    }
+
+    open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
+        /** Check correlation enabled and correlation field has populated */
+        if (!messagePrioritization.correlationId.isNullOrBlank()) {
+            val id = messagePrioritization.id
+            val group = messagePrioritization.group
+            val correlationId = messagePrioritization.correlationId!!
+            val types = getGroupCorrelationTypes(messagePrioritization)
+            log.info("checking correlation for message($id), group($group), types($types), " +
+                    "correlation id($correlationId)")
+
+            /** Get all previously received messages from database for group and optional types and correlation Id */
+            val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group,
+                    arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId)
+
+            /** If multiple records found, then check correlation */
+            if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
+                /** Check all correlation satisfies */
+                val correlationResults = MessageCorrelationUtils
+                        .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+
+                if (correlationResults.correlated) {
+                    /** Correlation  satisfied */
+                    val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
+                    /**  Send only correlated ids to next processor */
+                    this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds,
+                            To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+                } else {
+                    /** Correlation not satisfied */
+                    log.trace("correlation not matched : ${correlationResults.message}")
+                    val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
+                    // Update the Message state to Wait
+                    messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+                }
+            } else {
+                /** received first message of group and correlation Id, update the message with wait state */
+                messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
+            }
+        } else {
+            // No Correlation check needed, simply forward to next processor.
+            messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
+            this.processorContext.forward(messagePrioritization.id, messagePrioritization.id,
+                    To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+        }
+    }
+
+    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+     * otherwise correlation happens with group and correlationId */
+    open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        return null
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtils.kt
new file mode 100644 (file)
index 0000000..cc30af2
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CorrelationCheckResponse
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toTypeNCorrelation
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+object MessageCorrelationUtils {
+
+    /** Assumption is message is of same group **/
+    fun correlatedMessages(collectedMessages: List<MessagePrioritization>): CorrelationCheckResponse {
+        val correlationCheckResponse = CorrelationCheckResponse(message = "not correlated")
+        if (collectedMessages.size > 1) {
+            val filteredMessage = collectedMessages.filter { !it.correlationId.isNullOrBlank() }
+            if (filteredMessage.isNotEmpty()) {
+                val groupedMessage = filteredMessage.groupBy { it.toFormatedCorrelation() }
+                if (groupedMessage.size == 1) {
+                    correlationCheckResponse.correlated = true
+                    correlationCheckResponse.message = null
+                }
+            }
+        } else {
+            correlationCheckResponse.message = "received only one message for that group"
+        }
+        return correlationCheckResponse
+    }
+
+    /** Assumption is message is of same group and checking for required types **/
+    fun correlatedMessagesWithTypes(collectedMessages: List<MessagePrioritization>, types: List<String>?)
+            : CorrelationCheckResponse {
+
+        return if (!types.isNullOrEmpty() && collectedMessages.size > 1) {
+
+            val unknownMessageTypes = collectedMessages.filter { !types.contains(it.type) }.map { it.id }
+            if (!unknownMessageTypes.isNullOrEmpty()) {
+                throw BluePrintProcessorException("Messages($unknownMessageTypes) is not in type of($types)")
+            }
+
+            val copyTypes = types.toTypedArray().copyOf().toMutableList()
+
+            val filteredMessage = collectedMessages.filter {
+                !it.correlationId.isNullOrBlank()
+                        && types.contains(it.type)
+            }
+            var correlatedKeys: MutableSet<String> = mutableSetOf()
+            if (filteredMessage.isNotEmpty()) {
+                val correlatedMap = filteredMessage.groupBy { it.toTypeNCorrelation() }
+                val foundType = correlatedMap.keys.map { it.type }
+                copyTypes.removeAll(foundType)
+                correlatedKeys = correlatedMap.keys.map {
+                    it.correlationId
+                }.toMutableSet()
+            }
+            /** Check if any Types missing and same correlation id for all types */
+            return if (copyTypes.isEmpty()) {
+                if (correlatedKeys.size == 1) CorrelationCheckResponse(correlated = true)
+                else CorrelationCheckResponse(message = "not matching correlation keys($correlatedKeys)")
+            } else {
+                CorrelationCheckResponse(message = "couldn't find types($copyTypes)")
+            }
+        } else {
+            return correlatedMessages(collectedMessages)
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
new file mode 100644 (file)
index 0000000..3281a97
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.*
+
+object MessagePrioritizationSample {
+
+    fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
+        return PrioritizationConfiguration().apply {
+            inputTopicSelector = "prioritize-input"
+            outputTopic = "prioritize-output-topic"
+            expiredTopic = "prioritize-expired-topic"
+            expiryConfiguration = ExpiryConfiguration().apply {
+                frequencyMilli = 10000L
+                maxPollRecord = 2000
+            }
+            shutDownConfiguration = ShutDownConfiguration().apply {
+                waitMill = 2000L
+            }
+            cleanConfiguration = CleanConfiguration().apply {
+                frequencyMilli = 10000L
+                expiredRecordsHoldDays = 5
+            }
+        }
+    }
+
+    private fun currentDatePlusDays(days: Int): Date {
+        val calender = Calendar.getInstance()
+        calender.add(Calendar.DATE, days)
+        return calender.time
+    }
+
+    fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
+        return sampleMessages("sample-group", messageState, count)
+    }
+
+    fun sampleMessages(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+        val messages: MutableList<MessagePrioritization> = arrayListOf()
+        repeat(count) {
+            val backPressureMessage = createMessage(groupName, messageState,
+                    "sample-type", null)
+            messages.add(backPressureMessage)
+        }
+        return messages
+    }
+
+    fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+        val messages: MutableList<MessagePrioritization> = arrayListOf()
+        repeat(count) {
+            val backPressureMessage = createMessage(groupName, messageState, "sample-type",
+                    "key1=value1,key2=value2")
+            messages.add(backPressureMessage)
+        }
+        return messages
+    }
+
+    fun sampleMessageWithDifferentTypeSameCorrelation(groupName: String, messageState: String,
+                                                      count: Int): List<MessagePrioritization> {
+        val messages: MutableList<MessagePrioritization> = arrayListOf()
+        repeat(count) {
+            val backPressureMessage = createMessage(groupName, messageState, "type-$it",
+                    "key1=value1,key2=value2")
+            messages.add(backPressureMessage)
+        }
+        return messages
+    }
+
+    fun createMessage(groupName: String, messageState: String, messageType: String,
+                      messageCorrelationId: String?): MessagePrioritization {
+
+        return MessagePrioritization().apply {
+            id = UUID.randomUUID().toString()
+            group = groupName
+            type = messageType
+            state = messageState
+            correlationId = messageCorrelationId
+            message = "I am the Message"
+            createdDate = Date()
+            updatedDate = Date()
+            expiryDate = currentDatePlusDays(3)
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
new file mode 100644 (file)
index 0000000..bd99f72
--- /dev/null
@@ -0,0 +1,175 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import io.mockk.coEvery
+import io.mockk.every
+import io.mockk.spyk
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Before
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.context.ApplicationContext
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.Test
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DataJpaTest
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+    BlueprintPropertyConfiguration::class, BluePrintProperties::class,
+    MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class])
+@TestPropertySource(properties =
+[
+    "spring.jpa.show-sql=true",
+    "spring.jpa.properties.hibernate.show_sql=true",
+    "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
+
+    "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
+    "blueprintsprocessor.messageconsumer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageconsumer.prioritize-input.applicationId=test-prioritize-application",
+    "blueprintsprocessor.messageconsumer.prioritize-input.topic=prioritize-input-topic",
+
+    // To send initial test message
+    "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
+    "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+])
+open class MessagePrioritizationConsumerTest {
+
+    @Autowired
+    lateinit var applicationContext: ApplicationContext
+
+    @Autowired
+    lateinit var prioritizationMessageRepository: PrioritizationMessageRepository
+
+    @Autowired
+    lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+    @Before
+    fun setup() {
+        BluePrintDependencyService.inject(applicationContext)
+    }
+
+    @Test
+    fun testBluePrintKafkaJDBCKeyStore() {
+        runBlocking {
+            assertNotNull(prioritizationMessageRepository, "failed to get prioritizationMessageRepository")
+
+            val messagePrioritizationService: MessagePrioritizationStateService = BluePrintDependencyService
+                    .instance(MessagePrioritizationStateService::class)
+            assertNotNull(messagePrioritizationService, "failed to get messagePrioritizationService")
+
+            MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 1).forEach {
+                val message = messagePrioritizationService.saveMessage(it)
+                val repoResult = messagePrioritizationService.getMessage(message.id)
+                assertNotNull(repoResult, "failed to get inserted message.")
+            }
+        }
+    }
+
+    @Test
+    fun testStartConsuming() {
+        runBlocking {
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+
+            val streamingConsumerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageConsumerService(configuration.inputTopicSelector)
+            assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
+
+            val spyStreamingConsumerService = spyk(streamingConsumerService)
+            coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
+            coEvery { spyStreamingConsumerService.shutDown() } returns Unit
+            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+            val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
+
+
+            // Test Topology
+            val kafkaStreamConsumerFunction = spyMessagePrioritizationConsumer.kafkaStreamConsumerFunction(configuration)
+            val messageConsumerProperties = bluePrintMessageLibPropertyService
+                    .messageConsumerProperties("blueprintsprocessor.messageconsumer.prioritize-input")
+            val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, null)
+            assertNotNull(topology, "failed to get create topology")
+
+            every { spyMessagePrioritizationConsumer.consumerService(any()) } returns spyStreamingConsumerService
+            spyMessagePrioritizationConsumer.startConsuming(configuration)
+            spyMessagePrioritizationConsumer.shutDown()
+        }
+    }
+
+    /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+    //@Test
+    fun testMessagePrioritizationConsumer() {
+        runBlocking {
+            val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+            messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+            /** Send sample message with every 1 sec */
+            val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageProducerService("prioritize-input") as KafkaBasicAuthMessageProducerService
+            launch {
+             MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                    delay(100)
+                    val headers: MutableMap<String, String> = hashMapOf()
+                    headers["id"] = it.id
+                    blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+                            headers = headers)
+                }
+
+                MessagePrioritizationSample
+                        .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                        .forEach {
+                            delay(100)
+                            val headers: MutableMap<String, String> = hashMapOf()
+                            headers["id"] = it.id
+                            blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+                                    headers = headers)
+                        }
+
+                MessagePrioritizationSample
+                        .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                        .forEach {
+                            delay(2000)
+                            val headers: MutableMap<String, String> = hashMapOf()
+                            headers["id"] = it.id
+                            blueprintMessageProducerService.sendMessageNB(message = it.asJsonString(false),
+                                    headers = headers)
+                        }
+            }
+            delay(10000)
+            messagePrioritizationConsumer.shutDown()
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
new file mode 100644 (file)
index 0000000..4e3eb19
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.PrimaryDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.ComponentScan
+import org.springframework.context.annotation.Configuration
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
+import org.springframework.stereotype.Service
+import javax.sql.DataSource
+
+@Configuration
+@ComponentScan(basePackages = ["org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db"])
+@EnableAutoConfiguration
+open class TestDatabaseConfiguration {
+
+    @Bean("primaryDBLibGenericService")
+    open fun primaryDBLibGenericService(dataSource: DataSource): PrimaryDBLibGenericService {
+        return PrimaryDBLibGenericService(NamedParameterJdbcTemplate(dataSource))
+    }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
+open class TestMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
+    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        return when (messagePrioritization.group) {
+            "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+            else -> null
+        }
+    }
+}
+
+@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+open class DefaultMessageAggregateProcessor() : MessageAggregateProcessor()
+
+@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
+open class DefaultMessageOutputProcessor : MessageOutputProcessor()
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
new file mode 100644 (file)
index 0000000..b470db9
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import kotlin.test.assertTrue
+
+class MessageCorrelationUtilsTest {
+
+    @Test
+    fun testCorrelationKeysReordered() {
+
+        val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+                "type-0", "key1=value1,key2=value2")
+        val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+                "type-0", "key2=value2,key1=value1")
+
+        val multipleMessages: MutableList<MessagePrioritization> = arrayListOf()
+        multipleMessages.add(message1)
+        multipleMessages.add(message2)
+        val multipleMessagesResponse = MessageCorrelationUtils.correlatedMessages(multipleMessages)
+        assertTrue(multipleMessagesResponse.correlated, "failed in multipleMessages correlated keys reordered")
+    }
+
+    @Test
+    fun differentTypesWithSameCorrelationMessages() {
+        /** With Types **/
+        /* Assumption is Same group with different types */
+        val differentTypesWithSameCorrelationMessages = MessagePrioritizationSample
+                .sampleMessageWithDifferentTypeSameCorrelation("sample-group", MessageState.NEW.name, 3)
+        val differentTypesWithSameCorrelationMessagesResponse = MessageCorrelationUtils.correlatedMessagesWithTypes(
+                differentTypesWithSameCorrelationMessages,
+                arrayListOf("type-0", "type-1", "type-2"))
+        assertTrue(differentTypesWithSameCorrelationMessagesResponse.correlated,
+                "failed to correlate differentTypesWithSameCorrelationMessagesResponse")
+
+        /* Assumption is Same group with different types and one missing expected types,
+        In this case type-3 message is missing */
+        val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
+                differentTypesWithSameCorrelationMessages,
+                arrayListOf("type-0", "type-1", "type-2", "type-3"))
+        assertTrue(!differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
+                "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType")
+    }
+
+    @Test
+    fun withSameCorrelationMessagesWithIgnoredTypes() {
+        /** With ignoring Types */
+        /** Assumption is only one message received */
+        val withSameCorrelationOneMessages = MessagePrioritizationSample
+                .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 1)
+        val withSameCorrelationOneMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+                withSameCorrelationOneMessages, null)
+        assertTrue(!withSameCorrelationOneMessagesResp.correlated,
+                "failed to correlate withSameCorrelationMessagesResp")
+
+        /** Assumption is two message received for same group with same correlation */
+        val withSameCorrelationMessages = MessagePrioritizationSample
+                .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 2)
+        val withSameCorrelationMessagesResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+                withSameCorrelationMessages, null)
+        assertTrue(withSameCorrelationMessagesResp.correlated,
+                "failed to correlate withSameCorrelationMessagesResp")
+    }
+
+    @Test
+    fun differentTypesWithDifferentCorrelationMessage() {
+        /** Assumption is two message received for same group with different expected types and different correlation */
+        val message1 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+                "type-0", "key1=value1,key2=value2")
+        val message2 = MessagePrioritizationSample.createMessage("sample-group", MessageState.NEW.name,
+                "type-1", "key1=value1,key2=value3")
+        val differentTypesWithDifferentCorrelationMessage: MutableList<MessagePrioritization> = arrayListOf()
+        differentTypesWithDifferentCorrelationMessage.add(message1)
+        differentTypesWithDifferentCorrelationMessage.add(message2)
+        val differentTypesWithDifferentCorrelationMessageResp = MessageCorrelationUtils.correlatedMessagesWithTypes(
+                differentTypesWithDifferentCorrelationMessage,
+                arrayListOf("type-0", "type-1"))
+        assertTrue(!differentTypesWithDifferentCorrelationMessageResp.correlated,
+                "failed to correlate differentTypesWithDifferentCorrelationMessageResp")
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..e3a1f7a
--- /dev/null
@@ -0,0 +1,42 @@
+<!--
+  ~  Copyright © 2018-2019 AT&T Intellectual Property.
+  ~
+  ~  Licensed under the Apache License, Version 2.0 (the "License");
+  ~  you may not use this file except in compliance with the License.
+  ~  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<configuration>
+
+    <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
+    <property name="defaultPattern"
+              value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
+    <property name="testing"
+              value="%X{RequestID}|%X{InvocationID}|%logger{50}| %msg%n"/>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>${localPattern}</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.springframework.test" level="warn"/>
+    <logger name="org.springframework" level="warn"/>
+    <logger name="org.hibernate.type.descriptor.sql" level="warn"/>
+    <logger name="org.onap.ccsdk.cds.blueprintsprocessor" level="info"/>
+
+    <root level="warn">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+</configuration>
index 3ee6737..38f9071 100755 (executable)
@@ -39,6 +39,7 @@
         <module>restconf-executor</module>
         <module>cli-executor</module>
         <module>config-snapshots</module>
+        <module>message-prioritizaion</module>
     </modules>
 
     <dependencies>
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/AbstractKafkaTopologyComponents.kt
new file mode 100644 (file)
index 0000000..4c6c0ac
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.Punctuator
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+/** CDS Kafka Stream Processor abstract class to implement */
+abstract class AbstractBluePrintMessageProcessor<K, V> : Processor<K, V> {
+
+    private val log = logger(AbstractBluePrintMessageProcessor::class)
+
+    lateinit var processorContext: ProcessorContext
+
+
+    override fun process(key: K, value: V) = runBlocking(Dispatchers.IO) {
+        try {
+            processNB(key, value)
+        } catch (e: Exception) {
+            log.error("failed in processor(${this.javaClass.simpleName}) message(${this.javaClass.simpleName} :", e)
+        }
+    }
+
+    override fun init(context: ProcessorContext) {
+        log.info("initializing processor (${this.javaClass.simpleName})")
+        this.processorContext = context
+
+    }
+
+    override fun close() {
+        log.info("closing processor (${this.javaClass.simpleName})")
+    }
+
+    abstract suspend fun processNB(key: K, value: V)
+}
+
+/** CDS Kafka Stream Punctuator abstract class to implement */
+abstract class AbstractBluePrintMessagePunctuator : Punctuator {
+    lateinit var processorContext: ProcessorContext
+
+
+    override fun punctuate(timestamp: Long) = runBlocking(Dispatchers.IO) {
+        punctuateNB(timestamp)
+    }
+
+    abstract suspend fun punctuateNB(timestamp: Long)
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/kafka/KafkaJDBCStores.kt
new file mode 100644 (file)
index 0000000..86ccd74
--- /dev/null
@@ -0,0 +1,143 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
+
+/*
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.StateStore
+import org.apache.kafka.streams.state.StoreBuilder
+import org.apache.kafka.streams.state.StoreSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.db.BluePrintDBLibGenericService
+import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+import java.util.*
+
+
+class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> {
+
+    override fun get(): KafkaJDBCStore {
+        // Get the DBLibGenericService Instance
+        val bluePrintDBLibGenericService = BluePrintDependencyService.primaryDBLibGenericService()
+        return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService)
+    }
+
+    override fun name(): String {
+        return name
+    }
+
+    override fun metricsScope(): String {
+        return "jdbc-state"
+    }
+}
+
+class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier)
+    : StoreBuilder<KafkaJDBCStore> {
+
+    private var logConfig: MutableMap<String, String> = HashMap()
+    private var enableCaching: Boolean = false
+    private var enableLogging = true
+
+    override fun logConfig(): MutableMap<String, String> {
+        return logConfig
+    }
+
+    override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> {
+        enableCaching = false
+        return this
+    }
+
+    override fun loggingEnabled(): Boolean {
+        return enableLogging
+    }
+
+    override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> {
+        enableLogging = false
+        return this
+    }
+
+    override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> {
+        enableCaching = true
+        return this
+    }
+
+    override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> {
+        enableLogging = true
+        return this
+    }
+
+    override fun name(): String {
+        return "KafkaJDBCKeyStoreBuilder"
+    }
+
+    override fun build(): KafkaJDBCStore {
+        return storeSupplier.get()
+    }
+}
+
+interface KafkaJDBCStore : StateStore {
+
+    suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>>
+
+    suspend fun update(sql: String, params: Map<String, Any>): Int
+}
+
+
+class KafkaJDBCStoreImpl(private val name: String,
+                         private val bluePrintDBLibGenericService: BluePrintDBLibGenericService)
+    : KafkaJDBCStore {
+
+    private val log = logger(KafkaJDBCStoreImpl::class)
+
+    override fun isOpen(): Boolean {
+        log.info("isOpen...")
+        return true
+    }
+
+    override fun init(context: ProcessorContext, root: StateStore) {
+        log.info("init...")
+    }
+
+    override fun flush() {
+        log.info("flush...")
+    }
+
+    override fun close() {
+        log.info("Close...")
+    }
+
+    override fun name(): String {
+        return name
+    }
+
+    override fun persistent(): Boolean {
+        return true
+    }
+
+    override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> {
+        log.info("Query : $sql")
+        log.info("Params : $params")
+        return bluePrintDBLibGenericService.query(sql, params)
+    }
+
+    override suspend fun update(sql: String, params: Map<String, Any>): Int {
+        log.info("Query : $sql")
+        log.info("Params : $params")
+        return bluePrintDBLibGenericService.update(sql, params)
+    }
+}
+*/
index 229e462..d0297df 100644 (file)
@@ -56,6 +56,7 @@ open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerPrope
         val streamsConfig = streamsConfig(additionalConfig)
         val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction
         val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig)
+        log.info("Kafka streams topology : ${topology.describe()}")
         kafkaStreams = KafkaStreams(topology, streamsConfig)
         kafkaStreams.cleanUp()
         kafkaStreams.start()