Prioritization Optional NATS consumer support 82/100382/2
authorBrinda Santh <bs2796@att.com>
Thu, 16 Jan 2020 16:21:50 +0000 (11:21 -0500)
committerBrinda Santh <bs2796@att.com>
Thu, 16 Jan 2020 18:58:05 +0000 (13:58 -0500)
Add prioritization NATS consumer service and configuration data beans.

Optimizing message prioritization service interface.

Added Integration testing for NATS simulation.

Updated sample docker compose for NATS support

Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Icd21e5e2ab7b64d6e6e4b0610599ca947555ee15

26 files changed:
ms/blueprintsprocessor/application/src/main/dc/docker-compose-cluster.yaml
ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractMessagePrioritizeProcessor.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/KafkaMessagePrioritizationConsumer.kt [moved from ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt with 90% similarity]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/TestConfiguration.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/service/BluePrintDependencyService.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/ClusterUtils.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/BluePrintNatsLibData.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyService.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/TokenAuthNatsService.kt
ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsServiceTest.kt

index 020038c..7159534 100644 (file)
@@ -22,13 +22,17 @@ services:
     image: nats-streaming:latest
     container_name: nats
     hostname: nats
-    command: "-cid cds-cluster --auth tokenAuth -store file -dir store-nats-0 --cluster_node_id nats-0"
+    command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats-0"
     networks:
       - cds-network
     ports:
       - "8222:8222"
       - "4222:4222"
     restart: always
+    volumes:
+      - target: /opt/app/onap/nats/store
+        type: volume
+        source: nats-store
   cds-controller-0:
     depends_on:
       - db
@@ -57,6 +61,7 @@ services:
       CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
       CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
       #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+      NATS_CLUSTER_ID: cds-cluster
       NATS_HOSTS: nats://nats:4222
       APPLICATIONNAME: cds-controller
       BUNDLEVERSION: 1.0.0
@@ -90,6 +95,7 @@ services:
       CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0
       CLUSTER_STORAGE_PATH: /opt/app/onap/config/cluster
       #CLUSTER_CONFIG_FILE: /opt/app/onap/config/atomix/atomix-multicast.conf
+      NATS_CLUSTER_ID: cds-cluster
       NATS_HOSTS: nats://nats:4222
       APPLICATIONNAME: resource-resolution
       BUNDLEVERSION: 1.0.0
@@ -116,6 +122,7 @@ services:
       CLUSTER_ID: cds-cluster
       CLUSTER_NODE_ID: py-executor-0
       CLUSTER_MEMBERS: cds-controller-0,resource-resolution-0,py-executor-0
+      NATS_CLUSTER_ID: cds-cluster
       NATS_HOSTS: nats://nats:4222
       APPLICATIONNAME: py-executor
       BUNDLEVERSION: 1.0.0
@@ -132,6 +139,12 @@ volumes:
       type: none
       device: /opt/app/cds/mysql/data
       o: bind
+  nats-store:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/nats/nats-0/store
+      o: bind
   blueprints-deploy:
     driver: local
     driver_opts:
@@ -142,13 +155,13 @@ volumes:
     driver: local
     driver_opts:
       type: none
-      device: /opt/app/cds/cds-controller/config
+      device: /opt/app/cds/cds-controller/cds-controller-0/config
       o: bind
   resource-resolution-config:
     driver: local
     driver_opts:
       type: none
-      device: /opt/app/cds/resource-resolution/config
+      device: /opt/app/cds/resource-resolution/resource-resolution-0/config
       o: bind
 
 networks:
index 20b17bc..8f2a786 100755 (executable)
@@ -18,6 +18,21 @@ services:
       MYSQL_DATABASE: sdnctl
       MYSQL_USER: sdnctl
       MYSQL_PASSWORD: sdnctl
+  nats:
+    image: nats-streaming:latest
+    container_name: nats
+    hostname: nats
+    command: "-cid cds-cluster --auth tokenAuth -store file -dir /opt/app/onap/nats/store --cluster_node_id nats"
+    networks:
+      - cds-network
+    ports:
+      - "8222:8222"
+      - "4222:4222"
+    restart: always
+    volumes:
+      - target: /opt/app/onap/nats/store
+        type: volume
+        source: nats-store
   cds-controller-default:
     depends_on:
       - db
@@ -89,6 +104,12 @@ volumes:
       type: none
       device: /opt/app/cds/mysql/data
       o: bind
+  nats-store:
+    driver: local
+    driver_opts:
+      type: none
+      device: /opt/app/cds/nats/store
+      o: bind
   blueprints-deploy:
     driver: local
     driver_opts:
index 8345df5..424929b 100644 (file)
@@ -37,6 +37,7 @@ open class PrioritizationConfiguration : Serializable {
     lateinit var shutDownConfiguration: ShutDownConfiguration
     lateinit var cleanConfiguration: CleanConfiguration
     var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+    var natsConfiguration: NatsConfiguration? = null // Optional NATS Consumer Configuration
 }
 
 open class KafkaConfiguration : Serializable {
@@ -45,6 +46,13 @@ open class KafkaConfiguration : Serializable {
     lateinit var outputTopic: String // Publish Configuration Selector
 }
 
+open class NatsConfiguration : Serializable {
+    lateinit var connectionSelector: String // Consumer Configuration Selector
+    lateinit var inputSubject: String // Publish Configuration Selector
+    lateinit var expiredSubject: String // Publish Configuration Selector
+    lateinit var outputSubject: String // Publish Configuration Selector
+}
+
 open class ExpiryConfiguration : Serializable {
     var frequencyMilli: Long = 30000L
     var maxPollRecord: Int = 1000
index 464f97a..dfe5169 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
-import org.apache.kafka.streams.processor.ProcessorContext
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 
 interface MessagePrioritizationService {
 
-    fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+    fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration)
+
+    fun getConfiguration(): PrioritizationConfiguration
 
     suspend fun prioritize(messagePrioritization: MessagePrioritization)
 
     /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
     suspend fun output(messages: List<MessagePrioritization>)
 
-    /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
-    suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+    /** Scheduler service will use this method for updating the expired messages based on the expiryConfiguration */
+    suspend fun updateExpiredMessages()
 
-    /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
-    suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
+    /** Scheduler service will use this method for clean the expired messages based on the cleanConfiguration */
+    suspend fun cleanExpiredMessage()
 }
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/AbstractKafkaMessagePrioritizationService.kt
new file mode 100644 (file)
index 0000000..112a803
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractKafkaMessagePrioritizationService(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+    private val log = logger(AbstractKafkaMessagePrioritizationService::class)
+
+    lateinit var processorContext: ProcessorContext
+
+    fun setKafkaProcessorContext(processorContext: ProcessorContext) {
+        this.processorContext = processorContext
+    }
+
+    override suspend fun output(messages: List<MessagePrioritization>) {
+        log.info("$$$$$ received in output processor id(${messages.ids()})")
+        checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+        check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+        messages.forEach { message ->
+            val updatedMessage =
+                messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+            processorContext.forward(
+                updatedMessage.id,
+                updatedMessage,
+                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+            )
+        }
+    }
+
+    override suspend fun updateExpiredMessages() {
+        checkNotNull(prioritizationConfiguration.kafkaConfiguration) { "failed to initialize kafka configuration" }
+        check(::processorContext.isInitialized) { "failed to initialize kafka processor " }
+
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+        val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+        try {
+            val fetchMessages = messagePrioritizationStateService
+                .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+            val expiredIds = fetchMessages?.ids()
+            if (expiredIds != null && expiredIds.isNotEmpty()) {
+                messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+                fetchMessages.forEach { expiredMessage ->
+                    expiredMessage.state = MessageState.EXPIRED.name
+                    processorContext.forward(
+                        expiredMessage.id, expiredMessage,
+                        To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+                    )
+                }
+            }
+        } catch (e: Exception) {
+            log.error("failed in updating expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
+        }
+    }
+}
index 656646f..d4f8470 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
 /** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
 abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
 
-    private val log = logger(AbstractMessagePrioritizeProcessor::class)
-
-    lateinit var prioritizationConfiguration: PrioritizationConfiguration
-
     override fun init(processorContext: ProcessorContext) {
         this.processorContext = processorContext
     }
index 624a69f..1b06124 100644 (file)
@@ -29,7 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 
 open class DefaultMessagePrioritizeProcessor(
     private val messagePrioritizationStateService: MessagePrioritizationStateService,
-    private val messagePrioritizationService: MessagePrioritizationService
+    private val kafkaMessagePrioritizationService: MessagePrioritizationService
 ) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
 
     private val log = logger(DefaultMessagePrioritizeProcessor::class)
@@ -39,7 +39,7 @@ open class DefaultMessagePrioritizeProcessor(
         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
             ?: throw BluePrintProcessorException("failed to convert")
         try {
-            messagePrioritizationService.prioritize(messagePrioritize)
+            kafkaMessagePrioritizationService.prioritize(messagePrioritize)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -59,7 +59,14 @@ open class DefaultMessagePrioritizeProcessor(
     override fun init(context: ProcessorContext) {
         super.init(context)
         /** Set Configuration and Processor Context to messagePrioritizationService */
-        messagePrioritizationService.setKafkaProcessorContext(processorContext)
+        if (kafkaMessagePrioritizationService is AbstractKafkaMessagePrioritizationService) {
+            kafkaMessagePrioritizationService.setKafkaProcessorContext(processorContext)
+        } else {
+            throw BluePrintProcessorException(
+                "messagePrioritizationService is not instance of " +
+                    "AbstractKafkaMessagePrioritizationService, it is ${kafkaMessagePrioritizationService.javaClass}"
+            )
+        }
     }
 
     override fun close() {
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 import org.apache.kafka.common.serialization.Serdes
 import org.apache.kafka.streams.Topology
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
@@ -30,13 +31,14 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
 
-open class MessagePrioritizationConsumer(
-    private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+open class KafkaMessagePrioritizationConsumer(
+    private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
+    private val kafkaMessagePrioritizationService: MessagePrioritizationService
 ) {
 
-    private val log = logger(MessagePrioritizationConsumer::class)
+    private val log = logger(KafkaMessagePrioritizationConsumer::class)
 
-    lateinit var streamingConsumerService: BlueprintMessageConsumerService
+    private lateinit var streamingConsumerService: BlueprintMessageConsumerService
 
     open fun consumerService(selector: String): BlueprintMessageConsumerService {
         return bluePrintMessageLibPropertyService
@@ -67,8 +69,7 @@ open class MessagePrioritizationConsumer(
                 topology.addProcessor(
                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
                     bluePrintProcessorSupplier<ByteArray, ByteArray>(
-                        MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
-                        prioritizationConfiguration
+                        MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
                     ),
                     MessagePrioritizationConstants.SOURCE_INPUT
                 )
@@ -100,7 +101,7 @@ open class MessagePrioritizationConsumer(
     }
 
     suspend fun shutDown() {
-        if (streamingConsumerService != null) {
+        if (::streamingConsumerService.isInitialized) {
             streamingConsumerService.shutDown()
         }
     }
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/AbstractNatsMessagePrioritizationService.kt
new file mode 100644 (file)
index 0000000..502a782
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.AbstractMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+abstract class AbstractNatsMessagePrioritizationService(
+    private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+    private val log = logger(AbstractNatsMessagePrioritizationService::class)
+
+    lateinit var bluePrintNatsService: BluePrintNatsService
+
+    override suspend fun output(messages: List<MessagePrioritization>) {
+        log.info("$$$$$ received in output processor id(${messages.ids()})")
+        checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+        check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+        val outputSubject = prioritizationConfiguration.natsConfiguration!!.outputSubject
+        messages.forEach { message ->
+            val updatedMessage =
+                messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+
+            /** send to the output subject */
+            bluePrintNatsService.publish(
+                NatsClusterUtils.currentApplicationSubject(outputSubject),
+                updatedMessage.asJsonType().asByteArray()
+            )
+        }
+    }
+
+    override suspend fun updateExpiredMessages() {
+        checkNotNull(prioritizationConfiguration.natsConfiguration) { "failed to initialize NATS configuration" }
+        check(::bluePrintNatsService.isInitialized) { "failed to initialize NATS services" }
+
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+        val outputSubject = prioritizationConfiguration.natsConfiguration!!.expiredSubject
+        val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+        try {
+            val fetchMessages = messagePrioritizationStateService
+                .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+            val expiredIds = fetchMessages?.ids()
+            if (!expiredIds.isNullOrEmpty()) {
+                messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+                fetchMessages.forEach { expiredMessage ->
+                    expiredMessage.state = MessageState.EXPIRED.name
+                    /** send to the output subject */
+                    bluePrintNatsService.publish(
+                        NatsClusterUtils.currentApplicationSubject(outputSubject),
+                        expiredMessage.asJsonType().asByteArray()
+                    )
+                }
+            }
+        } catch (e: Exception) {
+            log.error("failed in updating expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
+        }
+    }
+}
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/nats/NatsMessagePrioritizationConsumer.kt
new file mode 100644 (file)
index 0000000..20da2c2
--- /dev/null
@@ -0,0 +1,91 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats
+
+import io.nats.streaming.MessageHandler
+import io.nats.streaming.Subscription
+import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.asJsonType
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.SubscriptionOptionsUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+open class NatsMessagePrioritizationConsumer(
+    private val bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService,
+    private val natsMessagePrioritizationService: MessagePrioritizationService
+) {
+    private val log = logger(NatsMessagePrioritizationConsumer::class)
+
+    lateinit var bluePrintNatsService: BluePrintNatsService
+    private lateinit var subscription: Subscription
+
+    suspend fun startConsuming() {
+        val prioritizationConfiguration = natsMessagePrioritizationService.getConfiguration()
+        val natsConfiguration = prioritizationConfiguration.natsConfiguration
+            ?: throw BluePrintProcessorException("couldn't get NATS consumer configuration")
+
+        check((natsMessagePrioritizationService is AbstractNatsMessagePrioritizationService)) {
+            "messagePrioritizationService is not of type AbstractNatsMessagePrioritizationService."
+        }
+        bluePrintNatsService = consumerService(natsConfiguration.connectionSelector)
+        natsMessagePrioritizationService.bluePrintNatsService = bluePrintNatsService
+        val inputSubject = NatsClusterUtils.currentApplicationSubject(natsConfiguration.inputSubject)
+        val loadBalanceGroup = ClusterUtils.applicationName()
+        val messageHandler = createMessageHandler()
+        val subscriptionOptions = SubscriptionOptionsUtils.durable(NatsClusterUtils.currentNodeDurable(inputSubject))
+        subscription = bluePrintNatsService.loadBalanceSubscribe(
+            inputSubject,
+            loadBalanceGroup,
+            messageHandler,
+            subscriptionOptions
+        )
+        log.info(
+            "Nats prioritization consumer listening on subject($inputSubject) on loadBalance group($loadBalanceGroup)."
+        )
+    }
+
+    suspend fun shutDown() {
+        if (::subscription.isInitialized) {
+            subscription.unsubscribe()
+        }
+        log.info("Nats prioritization consumer listener shutdown complete")
+    }
+
+    private fun consumerService(selector: String): BluePrintNatsService {
+        return bluePrintNatsLibPropertyService.bluePrintNatsService(selector)
+    }
+
+    private fun createMessageHandler(): MessageHandler {
+        return MessageHandler { message ->
+            try {
+                val messagePrioritization = message.asJsonType().asType(MessagePrioritization::class.java)
+                runBlocking {
+                    natsMessagePrioritizationService.prioritize(messagePrioritization)
+                }
+            } catch (e: Exception) {
+                log.error("failed to process prioritize message", e)
+            }
+        }
+    }
+}
index 9314032..a6963d8 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
-import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
@@ -37,15 +33,21 @@ abstract class AbstractMessagePrioritizationService(
 
     private val log = logger(AbstractMessagePrioritizationService::class)
 
-    var processorContext: ProcessorContext? = null
+    lateinit var prioritizationConfiguration: PrioritizationConfiguration
 
-    override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
-        this.processorContext = processorContext
+    override fun setConfiguration(prioritizationConfiguration: PrioritizationConfiguration) {
+        this.prioritizationConfiguration = prioritizationConfiguration
+    }
+
+    override fun getConfiguration(): PrioritizationConfiguration {
+        return this.prioritizationConfiguration
     }
 
     override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
         try {
             log.info("***** received in prioritize processor key(${messagePrioritize.id})")
+            check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
             /** Get the cluster lock for message group */
             val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
             // Save the Message
@@ -67,31 +69,21 @@ abstract class AbstractMessagePrioritizationService(
     override suspend fun output(messages: List<MessagePrioritization>) {
         log.info("$$$$$ received in output processor id(${messages.ids()})")
         messages.forEach { message ->
-            val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
-            /** Check for Kafka Processing, If yes, then send to the output topic */
-            if (this.processorContext != null) {
-                processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
-            }
+            messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
         }
     }
 
-    override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+    override suspend fun updateExpiredMessages() {
+        check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
         val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
         try {
             val fetchMessages = messagePrioritizationStateService
                 .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
             val expiredIds = fetchMessages?.ids()
-            if (expiredIds != null && expiredIds.isNotEmpty()) {
+            if (!expiredIds.isNullOrEmpty()) {
                 messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
-                if (processorContext != null) {
-                    fetchMessages.forEach { expired ->
-                        expired.state = MessageState.EXPIRED.name
-                        processorContext!!.forward(
-                            expired.id, expired,
-                            To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-                        )
-                    }
-                }
             }
         } catch (e: Exception) {
             log.error("failed in updating expired messages", e)
@@ -100,7 +92,10 @@ abstract class AbstractMessagePrioritizationService(
         }
     }
 
-    override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+    override suspend fun cleanExpiredMessage() {
+        check(::prioritizationConfiguration.isInitialized) { "failed to initialize prioritizationConfiguration " }
+
+        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
         val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
         try {
             messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
index b1c1fb1..2f08c1c 100644 (file)
@@ -43,7 +43,9 @@ open class MessagePrioritizationSchedulerService(
     }
     */
 
-    open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+    open suspend fun startScheduling() {
+        val prioritizationConfiguration = messagePrioritizationService.getConfiguration()
+
         log.info("Starting Prioritization Scheduler Service...")
         GlobalScope.launch {
             expiryScheduler(prioritizationConfiguration)
@@ -66,7 +68,7 @@ open class MessagePrioritizationSchedulerService(
         withContext(Dispatchers.Default) {
             while (keepGoing) {
                 try {
-                    messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+                    messagePrioritizationService.updateExpiredMessages()
                     delay(expiryConfiguration.frequencyMilli)
                 } catch (e: Exception) {
                     log.error("failed in prioritization expiry scheduler", e)
@@ -83,7 +85,7 @@ open class MessagePrioritizationSchedulerService(
         withContext(Dispatchers.Default) {
             while (keepGoing) {
                 try {
-                    messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+                    messagePrioritizationService.cleanExpiredMessage()
                     delay(cleanConfiguration.frequencyMilli)
                 } catch (e: Exception) {
                     log.error("failed in prioritization clean scheduler", e)
index b7d878e..305e64b 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.AbstractNatsMessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
@@ -28,24 +30,90 @@ import org.onap.ccsdk.cds.controllerblueprints.core.logger
 open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
     AbstractMessagePrioritizationService(messagePrioritizationStateService) {
 
-    private val log = logger(DefaultMessagePrioritizeProcessor::class)
+    /** Child overriding this implementation , if necessary */
+    override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        sampleMessagePrioritizationHandler.handleAggregation(messages)
+    }
+
+    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+     * otherwise correlation happens with group and correlationId */
+    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+    }
+}
+
+open class SampleKafkaMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+    AbstractKafkaMessagePrioritizationService(messagePrioritizationStateService) {
+
+    /** Child overriding this implementation , if necessary */
+    override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        sampleMessagePrioritizationHandler.handleAggregation(messages)
+    }
+
+    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+     * otherwise correlation happens with group and correlationId */
+    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+    }
+}
+
+open class SampleNatsMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
+    AbstractNatsMessagePrioritizationService(messagePrioritizationStateService) {
 
     /** Child overriding this implementation , if necessary */
     override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        sampleMessagePrioritizationHandler.handleAggregation(messages)
+    }
+
+    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+     * otherwise correlation happens with group and correlationId */
+    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+        val sampleMessagePrioritizationHandler = SampleMessagePrioritizationHandler(
+            this, messagePrioritizationStateService
+        )
+        return sampleMessagePrioritizationHandler.getGroupCorrelationTypes(messagePrioritization)
+    }
+}
+
+class SampleMessagePrioritizationHandler(
+    private val messagePrioritizationService: MessagePrioritizationService,
+    private val messagePrioritizationStateService: MessagePrioritizationStateService
+) {
+
+    private val log = logger(SampleMessagePrioritizationHandler::class)
+
+    suspend fun handleAggregation(messages: List<MessagePrioritization>) {
         log.info("messages(${messages.ids()}) aggregated")
         /** Sequence based on Priority and Updated Date */
         val sequencedMessage = messages.orderByHighestPriority()
         /** Update all messages to aggregated state */
-        messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
-        output(sequencedMessage)
+        messagePrioritizationStateService.setMessagesState(
+            sequencedMessage.ids(),
+            MessageState.AGGREGATED.name
+        )
+        messagePrioritizationService.output(sequencedMessage)
     }
 
-    /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
-     * otherwise correlation happens with group and correlationId */
-    override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+    fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
         return when (messagePrioritization.group) {
             /** Dummy Implementation, This can also be read from file and stored as cached map **/
             "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+            "pass-typed" -> arrayListOf(messagePrioritization.type)
             else -> null
         }
     }
index e497ef1..2c4ae30 100644 (file)
@@ -19,10 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.NatsConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import java.util.Calendar
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
 import java.util.Date
 import java.util.UUID
 
@@ -35,6 +37,12 @@ object MessagePrioritizationSample {
                 outputTopic = "prioritize-output-topic"
                 expiredTopic = "prioritize-expired-topic"
             }
+            natsConfiguration = NatsConfiguration().apply {
+                connectionSelector = "cds-controller"
+                inputSubject = "prioritize-input"
+                outputSubject = "prioritize-output"
+                expiredSubject = "prioritize-expired"
+            }
             expiryConfiguration = ExpiryConfiguration().apply {
                 frequencyMilli = 10000L
                 maxPollRecord = 2000
@@ -66,9 +74,7 @@ object MessagePrioritizationSample {
     }
 
     private fun currentDatePlusDays(days: Int): Date {
-        val calender = Calendar.getInstance()
-        calender.add(Calendar.DATE, days)
-        return calender.time
+        return controllerDate().addDate(days)
     }
 
     fun sampleMessages(messageState: String, count: Int): List<MessagePrioritization> {
index 18b3e4d..9100fb5 100644 (file)
@@ -19,7 +19,6 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 import org.apache.kafka.streams.processor.ProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
@@ -77,14 +76,11 @@ object MessageProcessorUtils {
         }
     }
 
-    /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
-    fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
-        ProcessorSupplier<K, V> {
+    /** Get the Kafka Supplier for processor lookup [name] **/
+    fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
         return ProcessorSupplier<K, V> {
             // Dynamically resolve the Prioritization Processor
-            val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
-            processorInstance.prioritizationConfiguration = prioritizationConfiguration
-            processorInstance
+            BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
         }
     }
 }
index 190f4e8..7f150f5 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import io.mockk.coEvery
 import io.mockk.every
+import io.mockk.mockk
 import io.mockk.spyk
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
@@ -27,13 +28,23 @@ import org.junit.runner.RunWith
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.KafkaMessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.nats.NatsMessagePrioritizationConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleKafkaMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleNatsMessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.service.BluePrintNatsLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
+import org.onap.ccsdk.cds.controllerblueprints.core.asByteArray
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonType
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 import org.springframework.beans.factory.annotation.Autowired
@@ -45,21 +56,20 @@ import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
 import kotlin.test.Test
 import kotlin.test.assertNotNull
-import kotlin.test.assertTrue
 
 @RunWith(SpringRunner::class)
 @DataJpaTest
 @DirtiesContext
 @ContextConfiguration(
-    classes = [BluePrintMessageLibConfiguration::class,
+    classes = [BluePrintMessageLibConfiguration::class, BluePrintNatsLibConfiguration::class,
         BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class,
         MessagePrioritizationConfiguration::class, TestDatabaseConfiguration::class]
 )
 @TestPropertySource(
     properties =
     [
-        "spring.jpa.show-sql=true",
-        "spring.jpa.properties.hibernate.show_sql=true",
+        "spring.jpa.show-sql=false",
+        "spring.jpa.properties.hibernate.show_sql=false",
         "spring.jpa.hibernate.naming.physical-strategy=org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl",
 
         "blueprintsprocessor.messageconsumer.prioritize-input.type=kafka-streams-basic-auth",
@@ -70,7 +80,11 @@ import kotlin.test.assertTrue
         // To send initial test message
         "blueprintsprocessor.messageproducer.prioritize-input.type=kafka-basic-auth",
         "blueprintsprocessor.messageproducer.prioritize-input.bootstrapServers=127.0.0.1:9092",
-        "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic"
+        "blueprintsprocessor.messageproducer.prioritize-input.topic=prioritize-input-topic",
+
+        "blueprintsprocessor.nats.cds-controller.type=token-auth",
+        "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+        "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
     ]
 )
 open class MessagePrioritizationConsumerTest {
@@ -87,13 +101,10 @@ open class MessagePrioritizationConsumerTest {
     lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
 
     @Autowired
-    lateinit var messagePrioritizationService: MessagePrioritizationService
-
-    @Autowired
-    lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+    lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
 
     @Autowired
-    lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
+    lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
 
     @Before
     fun setup() {
@@ -120,10 +131,10 @@ open class MessagePrioritizationConsumerTest {
     @Test
     fun testMessagePrioritizationService() {
         runBlocking {
-            assertTrue(
-                ::messagePrioritizationService.isInitialized,
-                "failed to initialize messagePrioritizationService"
-            )
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val messagePrioritizationService =
+                SampleMessagePrioritizationService(messagePrioritizationStateService)
+            messagePrioritizationService.setConfiguration(configuration)
 
             log.info("****************  without Correlation **************")
             /** Checking without correlation */
@@ -161,8 +172,8 @@ open class MessagePrioritizationConsumerTest {
             val spyStreamingConsumerService = spyk(streamingConsumerService)
             coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
             coEvery { spyStreamingConsumerService.shutDown() } returns Unit
-            val messagePrioritizationConsumer = MessagePrioritizationConsumer(
-                bluePrintMessageLibPropertyService
+            val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService, mockk()
             )
             val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
 
@@ -183,13 +194,15 @@ open class MessagePrioritizationConsumerTest {
     @Test
     fun testSchedulerService() {
         runBlocking {
-            val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
-            assertTrue(
-                ::messagePrioritizationSchedulerService.isInitialized,
-                "failed to initialize messagePrioritizationSchedulerService"
-            )
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val messagePrioritizationService =
+                SampleMessagePrioritizationService(messagePrioritizationStateService)
+            messagePrioritizationService.setConfiguration(configuration)
+
+            val messagePrioritizationSchedulerService =
+                MessagePrioritizationSchedulerService(messagePrioritizationService)
             launch {
-                messagePrioritizationSchedulerService.startScheduling(configuration)
+                messagePrioritizationSchedulerService.startScheduling()
             }
             launch {
                 /** To debug increase the delay time */
@@ -201,9 +214,30 @@ open class MessagePrioritizationConsumerTest {
 
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     // @Test
-    fun testMessagePrioritizationConsumer() {
+    fun testKafkaMessagePrioritizationConsumer() {
         runBlocking {
-            messagePrioritizationConsumer.startConsuming(MessagePrioritizationSample.samplePrioritizationConfiguration())
+
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            val kafkaMessagePrioritizationService =
+                SampleKafkaMessagePrioritizationService(messagePrioritizationStateService)
+            kafkaMessagePrioritizationService.setConfiguration(configuration)
+
+            val defaultMessagePrioritizeProcessor = DefaultMessagePrioritizeProcessor(
+                messagePrioritizationStateService,
+                kafkaMessagePrioritizationService
+            )
+
+            // Register the processor
+            BluePrintDependencyService.registerSingleton(
+                MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+                defaultMessagePrioritizeProcessor
+            )
+
+            val messagePrioritizationConsumer = KafkaMessagePrioritizationConsumer(
+                bluePrintMessageLibPropertyService,
+                kafkaMessagePrioritizationService
+            )
+            messagePrioritizationConsumer.startConsuming(configuration)
 
             /** Send sample message with every 1 sec */
             val blueprintMessageProducerService = bluePrintMessageLibPropertyService
@@ -247,4 +281,53 @@ open class MessagePrioritizationConsumerTest {
             messagePrioritizationConsumer.shutDown()
         }
     }
+
+    /** Integration Nats Testing, Enable and use this test case only for local desktop testing with real kafka broker
+     *  Start :
+     *  nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
+     * */
+    // @Test
+    fun testNatsMessagePrioritizationConsumer() {
+        runBlocking {
+            val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
+            assertNotNull(configuration.natsConfiguration, "failed to get nats Configuration")
+
+            val inputSubject =
+                NatsClusterUtils.currentApplicationSubject(configuration.natsConfiguration!!.inputSubject)
+
+            val natsMessagePrioritizationService =
+                SampleNatsMessagePrioritizationService(messagePrioritizationStateService)
+            natsMessagePrioritizationService.setConfiguration(configuration)
+
+            val messagePrioritizationConsumer =
+                NatsMessagePrioritizationConsumer(bluePrintNatsLibPropertyService, natsMessagePrioritizationService)
+            messagePrioritizationConsumer.startConsuming()
+
+            /** Send sample message with every 1 sec */
+            val bluePrintNatsService = messagePrioritizationConsumer.bluePrintNatsService
+
+            launch {
+                MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+                    delay(100)
+                    bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                }
+
+                MessagePrioritizationSample
+                    .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+                    .forEach {
+                        delay(100)
+                        bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                    }
+
+                MessagePrioritizationSample
+                    .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+                    .forEach {
+                        delay(200)
+                        bluePrintNatsService.publish(inputSubject, it.asJsonType().asByteArray())
+                    }
+            }
+            delay(3000)
+            messagePrioritizationConsumer.shutDown()
+        }
+    }
 }
index 0285079..22c3996 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
 import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
-import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration
 import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.ComponentScan
@@ -66,15 +63,3 @@ open class SamplePrioritizationListeners(private val defaultMessagePrioritizatio
 @Service
 open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
     SampleMessagePrioritizationService(messagePrioritizationStateService)
-
-/** For Kafka Consumer  **/
-@Service
-open class TestMessagePrioritizationConsumer(
-    bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
-) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
-
-@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class TestMessagePrioritizeProcessor(
-    messagePrioritizationStateService: MessagePrioritizationStateService,
-    messagePrioritizationService: MessagePrioritizationService
-) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)
index df3bde1..e845728 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.controllerblueprints.core.service
 
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.springframework.context.ApplicationContext
+import org.springframework.context.ConfigurableApplicationContext
 import kotlin.reflect.KClass
 
 /**
@@ -36,6 +37,14 @@ object BluePrintDependencyService {
         BluePrintDependencyService.applicationContext = applicationContext
     }
 
+    /** Used to inject [instance] into spring application context for the [key],
+     * Use this method only for testing
+     * */
+    fun registerSingleton(key: String, instance: Any) {
+        val configurableApplicationContext = applicationContext as ConfigurableApplicationContext
+        configurableApplicationContext.beanFactory.registerSingleton(key, instance)
+    }
+
     inline fun <reified T> instance(name: String): T {
         return applicationContext.getBean(name) as? T
             ?: throw BluePrintProcessorException("failed to get instance($name)")
index b52cd71..7fe955b 100644 (file)
@@ -27,6 +27,10 @@ object ClusterUtils {
         return ip.hostName
     }
 
+    fun applicationName(): String {
+        return BluePrintConstants.APP_NAME
+    }
+
     fun clusterId(): String {
         return System.getenv(BluePrintConstants.PROPERTY_CLUSTER_ID) ?: "cds-cluster"
     }
index 147d360..8d5d846 100644 (file)
@@ -41,6 +41,7 @@ class NatsLibConstants {
         const val SERVICE_BLUEPRINT_NATS_LIB_PROPERTY = "blueprint-nats-lib-property-service"
         const val DEFULT_NATS_SELECTOR = "cds-controller"
         const val PROPERTY_NATS_PREFIX = "blueprintsprocessor.nats."
+        const val PROPERTY_NATS_CLUSTER_ID = "NATS_CLUSTER_ID"
         const val TYPE_TOKEN_AUTH = "token-auth"
         const val TYPE_TLS_AUTH = "tls-auth"
     }
index 9767ac2..74897f3 100644 (file)
 
 package org.onap.ccsdk.cds.blueprintsprocessor.nats
 
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.utils.NatsClusterUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
 
 open class NatsConnectionProperties {
     lateinit var type: String
-    var clusterId: String = ClusterUtils.clusterId()
+    var clusterId: String = NatsClusterUtils.clusterId()
     var clientId: String = ClusterUtils.clusterNodeId()
     lateinit var host: String
     /** Rest endpoint selector to access Monitoring API */
index faf1715..18d0639 100644 (file)
@@ -50,15 +50,13 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
                 JacksonUtils.readValue(jsonNode, TLSAuthNatsConnectionProperties::class.java)!!
             }
             else -> {
-                throw BluePrintProcessorException("Nats type($type) not supported")
+                throw BluePrintProcessorException("NATS type($type) not supported")
             }
         }
     }
 
     fun natsConnectionProperties(prefix: String): NatsConnectionProperties {
-        val type = bluePrintPropertiesService.propertyBeanType(
-            "$prefix.type", String::class.java
-        )
+        val type = bluePrintPropertiesService.propertyBeanType("$prefix.type", String::class.java)
         return when (type) {
             NatsLibConstants.TYPE_TOKEN_AUTH -> {
                 tokenAuthNatsConnectionProperties(prefix)
@@ -67,7 +65,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
                 tlsAuthNatsConnectionProperties(prefix)
             }
             else -> {
-                throw BluePrintProcessorException("Grpc type($type) not supported")
+                throw BluePrintProcessorException("NATS type($type) not supported")
             }
         }
     }
@@ -90,7 +88,7 @@ open class BluePrintNatsLibPropertyService(private var bluePrintPropertiesServic
                 TLSAuthNatsService(natsConnectionProperties)
             }
             else -> {
-                throw BluePrintProcessorException("couldn't get nats service for properties $natsConnectionProperties")
+                throw BluePrintProcessorException("couldn't get NATS service for properties $natsConnectionProperties")
             }
         }
     }
index 60b7934..43a43bc 100644 (file)
@@ -21,15 +21,23 @@ import io.nats.client.Options
 import io.nats.streaming.NatsStreaming
 import io.nats.streaming.StreamingConnection
 import org.onap.ccsdk.cds.blueprintsprocessor.nats.TokenAuthNatsConnectionProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
 
 open class TokenAuthNatsService(private val natsConnectionProperties: TokenAuthNatsConnectionProperties) :
     BluePrintNatsService {
 
+    private val log = logger(TokenAuthNatsService::class)
+
     lateinit var streamingConnection: StreamingConnection
 
     override suspend fun connection(): StreamingConnection {
         if (!::streamingConnection.isInitialized) {
+            log.info(
+                "NATS connection requesting for cluster(${natsConnectionProperties.clusterId}) with" +
+                    "clientId($natsConnectionProperties.clientId)"
+            )
+
             val serverList = natsConnectionProperties.host.splitCommaAsList()
 
             val options = Options.Builder()
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/utils/NatsClusterUtils.kt
new file mode 100644 (file)
index 0000000..a7726a1
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.utils
+
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.ClusterUtils
+
+object NatsClusterUtils {
+
+    fun clusterId(): String {
+        return System.getenv(NatsLibConstants.PROPERTY_NATS_CLUSTER_ID)
+            ?: ClusterUtils.clusterId()
+    }
+
+    fun applicationSubject(appName: String, subject: String): String {
+        return "$appName.$subject"
+    }
+
+    fun currentApplicationSubject(subject: String): String {
+        return "${BluePrintConstants.APP_NAME}.$subject"
+    }
+
+    fun currentNodeDurable(subject: String): String {
+        return "${ClusterUtils.clusterNodeId()}-$subject"
+    }
+
+    fun applicationLoadBalanceGroup(): String {
+        return "${BluePrintConstants.APP_NAME}"
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt b/ms/blueprintsprocessor/modules/commons/nats-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/nats/service/BluePrintNatsLibPropertyServiceTest.kt
new file mode 100644 (file)
index 0000000..ba993d9
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.nats.service
+
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.BluePrintNatsLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.nats.NatsLibConstants
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertTrue
+
+@RunWith(SpringRunner::class)
+@ContextConfiguration(
+    classes = [BluePrintNatsLibConfiguration::class,
+        BluePrintPropertyConfiguration::class, BluePrintPropertiesService::class]
+)
+@TestPropertySource(
+    properties =
+    ["blueprintsprocessor.nats.cds-controller.type=token-auth",
+        "blueprintsprocessor.nats.cds-controller.host=nats://localhost:4222",
+        "blueprintsprocessor.nats.cds-controller.token=tokenAuth"
+    ]
+)
+class BluePrintNatsLibPropertyServiceTest {
+
+    @Autowired
+    lateinit var bluePrintNatsLibPropertyService: BluePrintNatsLibPropertyService
+
+    @Test
+    fun testNatsProperties() {
+        assertTrue(::bluePrintNatsLibPropertyService.isInitialized)
+        bluePrintNatsLibPropertyService.bluePrintNatsService(NatsLibConstants.DEFULT_NATS_SELECTOR)
+    }
+}
index 549be64..721828a 100644 (file)
@@ -84,7 +84,7 @@ class BluePrintNatsServiceTest {
      * Start the Server with : nats-streaming-server -cid cds-cluster --auth tokenAuth -m 8222 -V
      */
     // @Test
-    fun localTntegrationTest() {
+    fun localIntegrationTest() {
         runBlocking {
 
             val connectionProperties = TokenAuthNatsConnectionProperties().apply {