Prioritization expiry and clean scheduler service 58/100258/4
authorBrinda Santh <bs2796@att.com>
Mon, 13 Jan 2020 16:37:17 +0000 (11:37 -0500)
committerKAPIL SINGAL <ks220y@att.com>
Mon, 13 Jan 2020 18:22:25 +0000 (18:22 +0000)
Add prioritization expiry and clean scheduler service implementation.

Optimizing message passing between processors.

Added message sorting utils.

Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I049ea3bae2e2c546244136f15c3d89deda1e7053

18 files changed:
ms/blueprintsprocessor/application/src/main/resources/logback.xml
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationStateService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageRepository.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/DefaultMessagePrioritizeProcessor.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationConsumer.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt [deleted file]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/AbstractMessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateServiceImpl.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/SampleMessagePrioritizationService.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessagePrioritizationSample.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageCorrelationUtilsTest.kt
ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/utils/DateUtils.kt

index d58be8a..63ede28 100644 (file)
     <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/>
     <property name="defaultPattern"
               value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/>
-
-    <property name="colorPattern"
-              value="%${color}(%d{HH:mm:ss.SSS}|%X{RequestID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n)"/>
-
-    <appender name="SIFT" class="ch.qos.logback.classic.sift.SiftingAppender">
-        <discriminator class="org.onap.ccsdk.cds.blueprintsprocessor.uat.logging.SmartColorDiscriminator">
-            <defaultValue>white</defaultValue>
-        </discriminator>
-        <sift>
-            <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-                <encoder>
-                    <pattern>${defaultPattern}</pattern>
-                </encoder>
-            </appender>
-        </sift>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${defaultPattern}</pattern>
+        </encoder>
     </appender>
 
     <logger name="org.springframework" level="info"/>
@@ -43,7 +32,7 @@
     <logger name="org.onap.ccsdk.cds" level="info"/>
 
     <root level="info">
-        <appender-ref ref="SIFT"/>
+        <appender-ref ref="STDOUT"/>
     </root>
 
 </configuration>
index 3ecfa27..8345df5 100644 (file)
@@ -36,6 +36,10 @@ open class PrioritizationConfiguration : Serializable {
     lateinit var expiryConfiguration: ExpiryConfiguration
     lateinit var shutDownConfiguration: ShutDownConfiguration
     lateinit var cleanConfiguration: CleanConfiguration
+    var kafkaConfiguration: KafkaConfiguration? = null // Optional Kafka Consumer Configuration
+}
+
+open class KafkaConfiguration : Serializable {
     lateinit var inputTopicSelector: String // Consumer Configuration Selector
     lateinit var expiredTopic: String // Publish Configuration Selector
     lateinit var outputTopic: String // Publish Configuration Selector
index 584fd00..464f97a 100644 (file)
@@ -25,5 +25,12 @@ interface MessagePrioritizationService {
 
     suspend fun prioritize(messagePrioritization: MessagePrioritization)
 
-    suspend fun output(id: String)
+    /** Used to produce the prioritized or sequenced or aggregated message in Kafka topic or in database */
+    suspend fun output(messages: List<MessagePrioritization>)
+
+    /** Scheduler service will use this method for updating the expired messages based on the [expiryConfiguration] */
+    suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration)
+
+    /** Scheduler service will use this method for clean the expired messages based on the [cleanConfiguration] */
+    suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration)
 }
index 5dd41d7..2e5e6c6 100644 (file)
@@ -35,6 +35,8 @@ interface MessagePrioritizationStateService {
     suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
         List<MessagePrioritization>?
 
+    suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>?
+
     suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
 
     suspend fun getCorrelatedMessages(
@@ -60,9 +62,11 @@ interface MessagePrioritizationStateService {
 
     suspend fun deleteMessage(id: String)
 
+    suspend fun deleteMessages(id: List<String>)
+
+    suspend fun deleteExpiredMessage(retentionDays: Int)
+
     suspend fun deleteMessageByGroup(group: String)
 
     suspend fun deleteMessageStates(group: String, states: List<String>)
-
-    suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
 }
index 05b820a..d8e71d4 100644 (file)
@@ -50,3 +50,18 @@ fun MessagePrioritization.toFormatedCorrelation(): String {
 fun MessagePrioritization.toTypeNCorrelation(): TypeCorrelationKey {
     return TypeCorrelationKey(this.type, this.toFormatedCorrelation())
 }
+
+/** get list of message ids **/
+fun List<MessagePrioritization>.ids(): List<String> {
+    return this.map { it.id }
+}
+
+/** Ordered by highest priority and updated date **/
+fun List<MessagePrioritization>.orderByHighestPriority(): List<MessagePrioritization> {
+    return this.sortedWith(compareBy(MessagePrioritization::priority, MessagePrioritization::updatedDate))
+}
+
+/** Ordered by Updated date **/
+fun List<MessagePrioritization>.orderByUpdatedDate(): List<MessagePrioritization> {
+    return this.sortedWith(compareBy(MessagePrioritization::updatedDate))
+}
index b051483..0b35e38 100644 (file)
@@ -33,20 +33,20 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "ORDER BY pm.createdDate asc"
+            "ORDER BY pm.createdDate asc"
     )
     fun findByGroupAndStateIn(group: String, states: List<String>, count: Pageable): List<MessagePrioritization>?
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "ORDER BY pm.updatedDate asc"
+            "ORDER BY pm.updatedDate asc"
     )
     fun findByGroupAndStateInOrderByUpdatedDate(group: String, states: List<String>, count: Pageable):
-            List<MessagePrioritization>?
+        List<MessagePrioritization>?
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
+            "AND pm.expiryDate > :expiryCheckDate ORDER BY pm.createdDate asc"
     )
     fun findByGroupAndStateInAndNotExpiredDate(
         group: String,
@@ -57,7 +57,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.state in :states " +
-                "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
     )
     fun findByStateInAndExpiredDate(
         states: List<String>,
@@ -67,7 +67,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
     )
     fun findByGroupAndStateInAndExpiredDate(
         group: String,
@@ -78,20 +78,25 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group " +
-                "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+            "AND pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
     )
-    fun findByByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+    fun findByGroupAndExpiredDate(group: String, expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
+
+    @Query(
+        "FROM MessagePrioritization pm WHERE  pm.expiryDate < :expiryCheckDate ORDER BY pm.createdDate asc"
+    )
+    fun findByExpiredDate(expiryCheckDate: Date, count: Pageable): List<MessagePrioritization>?
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+            "AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
     )
     fun findByGroupAndCorrelationId(group: String, states: List<String>, correlationId: String):
-            List<MessagePrioritization>?
+        List<MessagePrioritization>?
 
     @Query(
         "FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state in :states " +
-                "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
+            "AND pm.type in :types AND pm.correlationId = :correlationId ORDER BY pm.createdDate asc"
     )
     fun findByGroupAndTypesAndCorrelationId(
         group: String,
@@ -104,7 +109,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
-                "WHERE id = :id"
+            "WHERE id = :id"
     )
     fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
 
@@ -112,7 +117,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
-                "WHERE id = :id"
+            "WHERE id = :id"
     )
     fun setPriorityForMessageId(id: String, priority: String, currentDate: Date): Int
 
@@ -120,7 +125,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
-                "WHERE id IN :ids"
+            "WHERE id IN :ids"
     )
     fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
 
@@ -128,7 +133,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET priority = :priority, updatedDate = :currentDate " +
-                "WHERE id IN :ids"
+            "WHERE id IN :ids"
     )
     fun setPriorityForMessageIds(ids: List<String>, priority: String, currentDate: Date): Int
 
@@ -136,7 +141,7 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
-                "WHERE id = :id"
+            "WHERE id = :id"
     )
     fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
 
@@ -144,17 +149,27 @@ interface PrioritizationMessageRepository : JpaRepository<MessagePrioritization,
     @Transactional
     @Query(
         "UPDATE MessagePrioritization SET state = :state, " +
-                "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
+            "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id"
     )
     fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
 
     @Modifying
     @Transactional
-    @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group")
+    @Query("DELETE FROM MessagePrioritization WHERE id IN :ids")
+    fun deleteByIds(ids: List<String>)
+
+    @Modifying
+    @Transactional
+    @Query("DELETE FROM MessagePrioritization WHERE expiryDate > :expiryCheckDate ")
+    fun deleteByExpiryDate(expiryCheckDate: Date)
+
+    @Modifying
+    @Transactional
+    @Query("DELETE FROM MessagePrioritization WHERE group = :group")
     fun deleteGroup(group: String)
 
     @Modifying
     @Transactional
-    @Query("DELETE FROM MessagePrioritization pm WHERE pm.group = :group AND pm.state IN :states")
+    @Query("DELETE FROM MessagePrioritization WHERE group = :group AND state IN :states")
     fun deleteGroupAndStateIn(group: String, states: List<String>)
 }
index c14a404..624a69f 100644 (file)
@@ -16,9 +16,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
-import org.apache.kafka.streams.processor.Cancellable
 import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
 import org.apache.kafka.streams.processor.To
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
@@ -28,7 +26,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
 
 open class DefaultMessagePrioritizeProcessor(
     private val messagePrioritizationStateService: MessagePrioritizationStateService,
@@ -37,15 +34,11 @@ open class DefaultMessagePrioritizeProcessor(
 
     private val log = logger(DefaultMessagePrioritizeProcessor::class)
 
-    lateinit var expiryCancellable: Cancellable
-    lateinit var cleanCancellable: Cancellable
-
     override suspend fun processNB(key: ByteArray, value: ByteArray) {
 
         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
             ?: throw BluePrintProcessorException("failed to convert")
         try {
-            messagePrioritizationService.setKafkaProcessorContext(processorContext)
             messagePrioritizationService.prioritize(messagePrioritize)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
@@ -65,10 +58,8 @@ open class DefaultMessagePrioritizeProcessor(
 
     override fun init(context: ProcessorContext) {
         super.init(context)
-        /** set up expiry marking cron */
-        initializeExpiryPunctuator()
-        /** Set up cleaning records cron */
-        initializeCleanPunctuator()
+        /** Set Configuration and Processor Context to messagePrioritizationService */
+        messagePrioritizationService.setKafkaProcessorContext(processorContext)
     }
 
     override fun close() {
@@ -76,40 +67,5 @@ open class DefaultMessagePrioritizeProcessor(
             "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
                 "taskId(${processorContext.taskId()})"
         )
-        expiryCancellable.cancel()
-        cleanCancellable.cancel()
-    }
-
-    open fun initializeExpiryPunctuator() {
-        val expiryPunctuator =
-            MessagePriorityExpiryPunctuator(
-                messagePrioritizationStateService
-            )
-        expiryPunctuator.processorContext = processorContext
-        expiryPunctuator.configuration = prioritizationConfiguration
-        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
-        expiryCancellable = processorContext.schedule(
-            Duration.ofMillis(expiryConfiguration.frequencyMilli),
-            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
-        )
-        log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
-    }
-
-    open fun initializeCleanPunctuator() {
-        val cleanPunctuator =
-            MessagePriorityCleanPunctuator(
-                messagePrioritizationStateService
-            )
-        cleanPunctuator.processorContext = processorContext
-        cleanPunctuator.configuration = prioritizationConfiguration
-        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
-        cleanCancellable = processorContext.schedule(
-            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
-            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
-        )
-        log.info(
-            "Clean punctuator setup complete with expiry " +
-                "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
-        )
     }
 }
index d7666a2..fb7cfd1 100644 (file)
@@ -26,6 +26,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
 
@@ -46,6 +47,9 @@ open class MessagePrioritizationConsumer(
         KafkaStreamConsumerFunction {
         return object : KafkaStreamConsumerFunction {
 
+            val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+                ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
             override suspend fun createTopology(
                 messageConsumerProperties: MessageConsumerProperties,
                 additionalConfig: Map<String, Any>?
@@ -72,7 +76,7 @@ open class MessagePrioritizationConsumer(
                 /** To receive completed and error messages */
                 topology.addSink(
                     MessagePrioritizationConstants.SINK_OUTPUT,
-                    prioritizationConfiguration.outputTopic,
+                    kafkaConsumerConfiguration.outputTopic,
                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
                 )
@@ -84,7 +88,11 @@ open class MessagePrioritizationConsumer(
     }
 
     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
-        streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
+
+        val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
+            ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
+
+        streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
 
         // Dynamic Consumer Function to create Topology
         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/kafka/MessagePrioritizationPunctuators.kt
deleted file mode 100644 (file)
index e27cf16..0000000
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright Â© 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-class MessagePriorityExpiryPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
-    AbstractBluePrintMessagePunctuator() {
-
-    private val log = logger(MessagePriorityExpiryPunctuator::class)
-    lateinit var configuration: PrioritizationConfiguration
-
-    override suspend fun punctuateNB(timestamp: Long) {
-
-        log.info(
-            "**** executing expiry punctuator applicationId(${processorContext.applicationId()}), " +
-                    "taskId(${processorContext.taskId()})"
-        )
-        val expiryConfiguration = configuration.expiryConfiguration
-        val fetchMessages = messagePrioritizationStateService
-            .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
-
-        val expiredIds = fetchMessages?.map { it.id }
-        if (expiredIds != null && expiredIds.isNotEmpty()) {
-            messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
-            fetchMessages.forEach { expired ->
-                processorContext.forward(
-                    expired.id, expired,
-                    To.child(MessagePrioritizationConstants.SINK_OUTPUT)
-                )
-            }
-        }
-    }
-}
-
-class MessagePriorityCleanPunctuator(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
-    AbstractBluePrintMessagePunctuator() {
-
-    private val log = logger(MessagePriorityCleanPunctuator::class)
-    lateinit var configuration: PrioritizationConfiguration
-
-    override suspend fun punctuateNB(timestamp: Long) {
-        log.info(
-            "**** executing clean punctuator applicationId(${processorContext.applicationId()}), " +
-                    "taskId(${processorContext.taskId()})"
-        )
-        // TODO
-    }
-}
index 13c0dd7..9314032 100644 (file)
@@ -18,11 +18,14 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 
 import org.apache.kafka.streams.processor.ProcessorContext
 import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -49,7 +52,7 @@ abstract class AbstractMessagePrioritizationService(
             messagePrioritizationStateService.saveMessage(messagePrioritize)
             handleCorrelationAndNextStep(messagePrioritize)
             /** Cluster unLock for message group */
-            MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
         } catch (e: Exception) {
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
@@ -61,12 +64,50 @@ abstract class AbstractMessagePrioritizationService(
         }
     }
 
-    override suspend fun output(id: String) {
-        log.info("$$$$$ received in output processor id($id)")
-        val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
-        /** Check for Kafka Processing, If yes, then send to the output topic */
-        if (this.processorContext != null) {
-            processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+    override suspend fun output(messages: List<MessagePrioritization>) {
+        log.info("$$$$$ received in output processor id(${messages.ids()})")
+        messages.forEach { message ->
+            val message = messagePrioritizationStateService.updateMessageState(message.id, MessageState.COMPLETED.name)
+            /** Check for Kafka Processing, If yes, then send to the output topic */
+            if (this.processorContext != null) {
+                processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+            }
+        }
+    }
+
+    override suspend fun updateExpiredMessages(expiryConfiguration: ExpiryConfiguration) {
+        val clusterLock = MessageProcessorUtils.prioritizationExpiryLock()
+        try {
+            val fetchMessages = messagePrioritizationStateService
+                .getExpiryEligibleMessages(expiryConfiguration.maxPollRecord)
+            val expiredIds = fetchMessages?.ids()
+            if (expiredIds != null && expiredIds.isNotEmpty()) {
+                messagePrioritizationStateService.updateMessagesState(expiredIds, MessageState.EXPIRED.name)
+                if (processorContext != null) {
+                    fetchMessages.forEach { expired ->
+                        expired.state = MessageState.EXPIRED.name
+                        processorContext!!.forward(
+                            expired.id, expired,
+                            To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+                        )
+                    }
+                }
+            }
+        } catch (e: Exception) {
+            log.error("failed in updating expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
+        }
+    }
+
+    override suspend fun cleanExpiredMessage(cleanConfiguration: CleanConfiguration) {
+        val clusterLock = MessageProcessorUtils.prioritizationCleanLock()
+        try {
+            messagePrioritizationStateService.deleteExpiredMessage(cleanConfiguration.expiredRecordsHoldDays)
+        } catch (e: Exception) {
+            log.error("failed in clean expired messages", e)
+        } finally {
+            MessageProcessorUtils.prioritizationUnLock(clusterLock)
         }
     }
 
@@ -78,7 +119,8 @@ abstract class AbstractMessagePrioritizationService(
             val correlationId = messagePrioritization.correlationId!!
             val types = getGroupCorrelationTypes(messagePrioritization)
             log.info(
-                "checking correlation for message($id), group($group), types($types), " +
+                "checking correlation for message($id), group($group), type(${messagePrioritization.type}), " +
+                    "correlation types($types), priority(${messagePrioritization.priority}), " +
                     "correlation id($correlationId)"
             )
 
@@ -96,57 +138,50 @@ abstract class AbstractMessagePrioritizationService(
                     .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
 
                 if (correlationResults.correlated) {
-                    /** Correlation  satisfied */
-                    val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
-                    /**  Send only correlated ids to aggregate processor */
-                    aggregate(correlatedIds)
+                    /** Update all messages to Aggregated state */
+                    messagePrioritizationStateService.setMessagesState(
+                        waitingCorrelatedStoreMessages.ids(),
+                        MessageState.PRIORITIZED.name
+                    )
+                    /** Correlation  satisfied, Send only correlated messages to aggregate processor */
+                    aggregate(waitingCorrelatedStoreMessages)
                 } else {
                     /** Correlation not satisfied */
                     log.trace("correlation not matched : ${correlationResults.message}")
-                    val waitMessageIds = waitingCorrelatedStoreMessages.map { it.id }
                     // Update the Message state to Wait
-                    messagePrioritizationStateService.setMessagesState(waitMessageIds, MessageState.WAIT.name)
+                    messagePrioritizationStateService.setMessagesState(
+                        waitingCorrelatedStoreMessages.ids(),
+                        MessageState.WAIT.name
+                    )
                 }
             } else {
                 /** received first message of group and correlation Id, update the message with wait state */
                 messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.WAIT.name)
             }
         } else {
-            // No Correlation check needed, simply forward to next processor.
+            /** No Correlation check needed, simply forward to next processor. */
             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
-            aggregate(messagePrioritization.id)
+            aggregate(arrayListOf(messagePrioritization))
         }
     }
 
-    open suspend fun aggregate(strIds: String) {
-        log.info("@@@@@ received in aggregation processor ids($strIds)")
-        val ids = strIds.split(",").map { it.trim() }
-        if (!ids.isNullOrEmpty()) {
+    open suspend fun aggregate(messages: List<MessagePrioritization>) {
+        log.info("@@@@@ received in aggregation processor ids(${messages.ids()}")
+        if (!messages.isNullOrEmpty()) {
             try {
-                if (ids.size == 1) {
-                    /** No aggregation or sequencing needed, simpley forward to next processor */
-                    output(ids.first())
-                } else {
-                    /** Implement Aggregation logic in overridden class, If necessary,
-                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
-                    handleAggregation(ids)
-                    /** Update all messages to Aggregated state */
-                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
-                }
+                /** Implement Aggregation logic in overridden class, If necessary,
+                Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                handleAggregation(messages)
             } catch (e: Exception) {
-                val error = "failed in Aggregate message($ids) : ${e.message}"
-                log.error(error, e)
-                val storeMessages = messagePrioritizationStateService.getMessages(ids)
-                if (!storeMessages.isNullOrEmpty()) {
-                    storeMessages.forEach { messagePrioritization ->
+                val error = "failed in aggregate message(${messages.ids()}) : ${e.message}"
+                if (!messages.isNullOrEmpty()) {
+                    messages.forEach { messagePrioritization ->
                         try {
                             /** Update the data store */
                             messagePrioritizationStateService.setMessageStateANdError(
                                 messagePrioritization.id,
                                 MessageState.ERROR.name, error
                             )
-                            /** Publish to output topic */
-                            output(messagePrioritization.id)
                         } catch (sendException: Exception) {
                             log.error(
                                 "failed to update/publish error message(${messagePrioritization.id}) : " +
@@ -154,6 +189,8 @@ abstract class AbstractMessagePrioritizationService(
                             )
                         }
                     }
+                    /** Publish to output topic */
+                    output(messages)
                 }
             }
         }
@@ -162,7 +199,7 @@ abstract class AbstractMessagePrioritizationService(
     /** Child will override this implementation , if necessary
      *  Here the place child has to implement custom Sequencing and Aggregation logic.
      * */
-    abstract suspend fun handleAggregation(messageIds: List<String>)
+    abstract suspend fun handleAggregation(messages: List<MessagePrioritization>)
 
     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
      * otherwise correlation happens with group and correlationId */
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationSchedulerService.kt
new file mode 100644 (file)
index 0000000..b1c1fb1
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Copyright Â© 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.springframework.stereotype.Service
+
+@Service
+open class MessagePrioritizationSchedulerService(
+    private val messagePrioritizationService: MessagePrioritizationService
+) {
+    private val log = logger(MessagePrioritizationSchedulerService::class)
+
+    @Volatile
+    var keepGoing = true
+
+    /** This is sample scheduler implementation used during starting application with configuration.
+    @EventListener(ApplicationReadyEvent::class)
+    open fun init() = runBlocking {
+        log.info("Starting PrioritizationListeners...")
+        startScheduling(MessagePrioritizationSample.samplePrioritizationConfiguration())
+    }
+    */
+
+    open suspend fun startScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+        log.info("Starting Prioritization Scheduler Service...")
+        GlobalScope.launch {
+            expiryScheduler(prioritizationConfiguration)
+        }
+        GlobalScope.launch {
+            cleanUpScheduler(prioritizationConfiguration)
+        }
+    }
+
+    open suspend fun shutdownScheduling(prioritizationConfiguration: PrioritizationConfiguration) {
+        keepGoing = false
+        delay(prioritizationConfiguration.shutDownConfiguration.waitMill)
+    }
+
+    private suspend fun expiryScheduler(
+        prioritizationConfiguration: PrioritizationConfiguration
+    ) {
+        val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+        log.info("Initializing prioritization expiry scheduler frequency(${expiryConfiguration.frequencyMilli})mSec")
+        withContext(Dispatchers.Default) {
+            while (keepGoing) {
+                try {
+                    messagePrioritizationService.updateExpiredMessages(expiryConfiguration)
+                    delay(expiryConfiguration.frequencyMilli)
+                } catch (e: Exception) {
+                    log.error("failed in prioritization expiry scheduler", e)
+                }
+            }
+        }
+    }
+
+    private suspend fun cleanUpScheduler(
+        prioritizationConfiguration: PrioritizationConfiguration
+    ) {
+        val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+        log.info("Initializing prioritization clean scheduler frequency(${cleanConfiguration.frequencyMilli})mSec")
+        withContext(Dispatchers.Default) {
+            while (keepGoing) {
+                try {
+                    messagePrioritizationService.cleanExpiredMessage(cleanConfiguration)
+                    delay(cleanConfiguration.frequencyMilli)
+                } catch (e: Exception) {
+                    log.error("failed in prioritization clean scheduler", e)
+                }
+            }
+        }
+    }
+}
index d9cd956..dde8d95 100644 (file)
@@ -23,6 +23,8 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.d
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.addDate
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.controllerDate
 import org.springframework.data.domain.PageRequest
 import org.springframework.stereotype.Service
 import org.springframework.transaction.annotation.Transactional
@@ -77,9 +79,15 @@ open class MessagePrioritizationStateServiceImpl(
         )
     }
 
+    override suspend fun getExpiredMessages(expiryDate: Date, count: Int): List<MessagePrioritization>? {
+        return prioritizationMessageRepository.findByExpiredDate(
+            expiryDate, PageRequest.of(0, count)
+        )
+    }
+
     override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
         List<MessagePrioritization>? {
-        return prioritizationMessageRepository.findByByGroupAndExpiredDate(
+        return prioritizationMessageRepository.findByGroupAndExpiredDate(
             group,
             expiryDate, PageRequest.of(0, count)
         )
@@ -142,21 +150,27 @@ open class MessagePrioritizationStateServiceImpl(
     }
 
     override suspend fun deleteMessage(id: String) {
-        return prioritizationMessageRepository.deleteById(id)
+        prioritizationMessageRepository.deleteById(id)
+        log.info("Prioritization Messages $id deleted successfully.")
     }
 
-    override suspend fun deleteMessageByGroup(group: String) {
-        return prioritizationMessageRepository.deleteGroup(group)
+    override suspend fun deleteMessages(ids: List<String>) {
+        prioritizationMessageRepository.deleteByIds(ids)
+        log.info("Prioritization Messages $ids deleted successfully.")
     }
 
-    override suspend fun deleteMessageStates(group: String, states: List<String>) {
-        return prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+    override suspend fun deleteExpiredMessage(retentionDays: Int) {
+        val expiryCheckDate = controllerDate().addDate(retentionDays)
+        prioritizationMessageRepository.deleteByExpiryDate(expiryCheckDate)
     }
 
-    override suspend fun deleteExpiredMessage(group: String, retentionDays: Int) {
-        return prioritizationMessageRepository.deleteGroupAndStateIn(
-            group,
-            arrayListOf(MessageState.EXPIRED.name)
-        )
+    override suspend fun deleteMessageByGroup(group: String) {
+        prioritizationMessageRepository.deleteGroup(group)
+        log.info("Prioritization Messages group($group) deleted successfully.")
+    }
+
+    override suspend fun deleteMessageStates(group: String, states: List<String>) {
+        prioritizationMessageRepository.deleteGroupAndStateIn(group, states)
+        log.info("Prioritization Messages group($group) with states($states) deleted successfully.")
     }
 }
index fcdb71c..b7d878e 100644 (file)
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
 
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ids
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 
-open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+/** Sample Prioritization Service, Define spring service injector to register in application*/
+open class SampleMessagePrioritizationService(private val messagePrioritizationStateService: MessagePrioritizationStateService) :
     AbstractMessagePrioritizationService(messagePrioritizationStateService) {
 
     private val log = logger(DefaultMessagePrioritizeProcessor::class)
 
     /** Child overriding this implementation , if necessary */
-    override suspend fun handleAggregation(messageIds: List<String>) {
-        log.info("messages($messageIds) aggregated")
-        messageIds.forEach { id ->
-            output(id)
-        }
+    override suspend fun handleAggregation(messages: List<MessagePrioritization>) {
+        log.info("messages(${messages.ids()}) aggregated")
+        /** Sequence based on Priority and Updated Date */
+        val sequencedMessage = messages.orderByHighestPriority()
+        /** Update all messages to aggregated state */
+        messagePrioritizationStateService.setMessagesState(sequencedMessage.ids(), MessageState.AGGREGATED.name)
+        output(sequencedMessage)
     }
 
     /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
index 4a36a40..e497ef1 100644 (file)
@@ -18,6 +18,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.CleanConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ExpiryConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.KafkaConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.ShutDownConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
@@ -29,9 +30,11 @@ object MessagePrioritizationSample {
 
     fun samplePrioritizationConfiguration(): PrioritizationConfiguration {
         return PrioritizationConfiguration().apply {
-            inputTopicSelector = "prioritize-input"
-            outputTopic = "prioritize-output-topic"
-            expiredTopic = "prioritize-expired-topic"
+            kafkaConfiguration = KafkaConfiguration().apply {
+                inputTopicSelector = "prioritize-input"
+                outputTopic = "prioritize-output-topic"
+                expiredTopic = "prioritize-expired-topic"
+            }
             expiryConfiguration = ExpiryConfiguration().apply {
                 frequencyMilli = 10000L
                 maxPollRecord = 2000
@@ -46,6 +49,22 @@ object MessagePrioritizationSample {
         }
     }
 
+    fun sampleSchedulerPrioritizationConfiguration(): PrioritizationConfiguration {
+        return PrioritizationConfiguration().apply {
+            expiryConfiguration = ExpiryConfiguration().apply {
+                frequencyMilli = 10L
+                maxPollRecord = 2000
+            }
+            shutDownConfiguration = ShutDownConfiguration().apply {
+                waitMill = 20L
+            }
+            cleanConfiguration = CleanConfiguration().apply {
+                frequencyMilli = 10L
+                expiredRecordsHoldDays = 5
+            }
+        }
+    }
+
     private fun currentDatePlusDays(days: Int): Date {
         val calender = Calendar.getInstance()
         calender.add(Calendar.DATE, days)
@@ -68,7 +87,11 @@ object MessagePrioritizationSample {
         return messages
     }
 
-    fun sampleMessageWithSameCorrelation(groupName: String, messageState: String, count: Int): List<MessagePrioritization> {
+    fun sampleMessageWithSameCorrelation(
+        groupName: String,
+        messageState: String,
+        count: Int
+    ): List<MessagePrioritization> {
         val messages: MutableList<MessagePrioritization> = arrayListOf()
         repeat(count) {
             val backPressureMessage = createMessage(
@@ -108,7 +131,7 @@ object MessagePrioritizationSample {
             group = groupName
             type = messageType
             state = messageState
-            priority = 5
+            priority = (1..10).shuffled().first()
             correlationId = messageCorrelationId
             message = "I am the Message"
             createdDate = Date()
index 186499d..18b3e4d 100644 (file)
@@ -45,8 +45,32 @@ object MessageProcessorUtils {
         } else null
     }
 
+    /** Utility to create the cluster lock for expiry scheduler*/
+    suspend fun prioritizationExpiryLock(): ClusterLock? {
+        val clusterService = BluePrintDependencyService.optionalClusterService()
+        return if (clusterService != null && clusterService.clusterJoined()) {
+            val lockName = "prioritize-expiry"
+            val clusterLock = clusterService.clusterLock(lockName)
+            clusterLock.lock()
+            if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+            clusterLock
+        } else null
+    }
+
+    /** Utility to create the cluster lock for expiry scheduler*/
+    suspend fun prioritizationCleanLock(): ClusterLock? {
+        val clusterService = BluePrintDependencyService.optionalClusterService()
+        return if (clusterService != null && clusterService.clusterJoined()) {
+            val lockName = "prioritize-clean"
+            val clusterLock = clusterService.clusterLock(lockName)
+            clusterLock.lock()
+            if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
+            clusterLock
+        } else null
+    }
+
     /** Utility used to cluster unlock for message [clusterLock] */
-    suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+    suspend fun prioritizationUnLock(clusterLock: ClusterLock?) {
         if (clusterLock != null) {
             clusterLock.unLock()
             clusterLock.close()
index ec0515c..190f4e8 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationSchedulerService
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
@@ -88,6 +89,9 @@ open class MessagePrioritizationConsumerTest {
     @Autowired
     lateinit var messagePrioritizationService: MessagePrioritizationService
 
+    @Autowired
+    lateinit var messagePrioritizationSchedulerService: MessagePrioritizationSchedulerService
+
     @Autowired
     lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
 
@@ -151,7 +155,7 @@ open class MessagePrioritizationConsumerTest {
             val configuration = MessagePrioritizationSample.samplePrioritizationConfiguration()
 
             val streamingConsumerService = bluePrintMessageLibPropertyService
-                .blueprintMessageConsumerService(configuration.inputTopicSelector)
+                .blueprintMessageConsumerService(configuration.kafkaConfiguration!!.inputTopicSelector)
             assertNotNull(streamingConsumerService, "failed to get blueprintMessageConsumerService")
 
             val spyStreamingConsumerService = spyk(streamingConsumerService)
@@ -176,6 +180,25 @@ open class MessagePrioritizationConsumerTest {
         }
     }
 
+    @Test
+    fun testSchedulerService() {
+        runBlocking {
+            val configuration = MessagePrioritizationSample.sampleSchedulerPrioritizationConfiguration()
+            assertTrue(
+                ::messagePrioritizationSchedulerService.isInitialized,
+                "failed to initialize messagePrioritizationSchedulerService"
+            )
+            launch {
+                messagePrioritizationSchedulerService.startScheduling(configuration)
+            }
+            launch {
+                /** To debug increase the delay time */
+                delay(20)
+                messagePrioritizationSchedulerService.shutdownScheduling(configuration)
+            }
+        }
+    }
+
     /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
     // @Test
     fun testMessagePrioritizationConsumer() {
index 3876cbb..73d3738 100644 (file)
@@ -19,6 +19,8 @@ package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.
 import org.junit.Test
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.orderByHighestPriority
+import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
 
 class MessageCorrelationUtilsTest {
@@ -59,10 +61,11 @@ class MessageCorrelationUtilsTest {
 
         /* Assumption is Same group with different types and one missing expected types,
         In this case type-3 message is missing */
-        val differentTypesWithSameCorrelationMessagesResWithMissingType = MessageCorrelationUtils.correlatedMessagesWithTypes(
-            differentTypesWithSameCorrelationMessages,
-            arrayListOf("type-0", "type-1", "type-2", "type-3")
-        )
+        val differentTypesWithSameCorrelationMessagesResWithMissingType =
+            MessageCorrelationUtils.correlatedMessagesWithTypes(
+                differentTypesWithSameCorrelationMessages,
+                arrayListOf("type-0", "type-1", "type-2", "type-3")
+            )
         assertTrue(
             !differentTypesWithSameCorrelationMessagesResWithMissingType.correlated,
             "failed to correlate differentTypesWithSameCorrelationMessagesResWithMissingType"
@@ -118,4 +121,12 @@ class MessageCorrelationUtilsTest {
             "failed to correlate differentTypesWithDifferentCorrelationMessageResp"
         )
     }
+
+    @Test
+    fun testPrioritizationOrdering() {
+        val differentPriorityMessages = MessagePrioritizationSample
+            .sampleMessageWithSameCorrelation("sample-group", MessageState.NEW.name, 5)
+        val orderedPriorityMessages = differentPriorityMessages.orderByHighestPriority()
+        assertNotNull(orderedPriorityMessages, "failed to order the priority messages")
+    }
 }
index 4fd907a..02dd202 100644 (file)
@@ -21,6 +21,7 @@ import java.text.SimpleDateFormat
 import java.time.LocalDateTime
 import java.time.ZoneId
 import java.time.format.DateTimeFormatter
+import java.util.Calendar
 import java.util.Date
 
 fun controllerDate(): Date {
@@ -45,3 +46,11 @@ fun Date.currentTimestamp(): String {
     val formatter = SimpleDateFormat(BluePrintConstants.DATE_TIME_PATTERN)
     return formatter.format(this)
 }
+
+/** Return incremented date for [number] of days */
+fun Date.addDate(number: Int): Date {
+    val calendar = Calendar.getInstance()
+    calendar.time = this
+    calendar.add(Calendar.DATE, number)
+    return calendar.time
+}