Formatting Code base with ktlint
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / topology / MessagePrioritizeProcessor.kt
index 7dde265..431e02f 100644 (file)
@@ -29,8 +29,7 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
 import java.time.Duration
-import java.util.*
-
+import java.util.UUID
 
 open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
 
@@ -42,7 +41,7 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
     override suspend fun processNB(key: ByteArray, value: ByteArray) {
         log.info("***** received in prioritize processor key(${String(key)})")
         val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
-                ?: throw BluePrintProcessorException("failed to convert")
+            ?: throw BluePrintProcessorException("failed to convert")
         try {
             // Save the Message
             messagePrioritizationStateService.saveMessage(messagePrioritize)
@@ -51,11 +50,15 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
             messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
             log.error(messagePrioritize.error)
             /** Update the data store */
-            messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name,
-                    messagePrioritize.error!!)
+            messagePrioritizationStateService.setMessageStateANdError(
+                messagePrioritize.id, MessageState.ERROR.name,
+                messagePrioritize.error!!
+            )
             /** Publish to Output topic */
-            this.processorContext.forward(messagePrioritize.id, messagePrioritize,
-                    To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+            this.processorContext.forward(
+                messagePrioritize.id, messagePrioritize,
+                To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+            )
         }
     }
 
@@ -68,8 +71,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
     }
 
     override fun close() {
-        log.info("closing prioritization processor applicationId(${processorContext.applicationId()}), " +
-                "taskId(${processorContext.taskId()})")
+        log.info(
+            "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+                    "taskId(${processorContext.taskId()})"
+        )
         expiryCancellable.cancel()
         cleanCancellable.cancel()
     }
@@ -79,8 +84,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         expiryPunctuator.processorContext = processorContext
         expiryPunctuator.configuration = prioritizationConfiguration
         val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
-        expiryCancellable = processorContext.schedule(Duration.ofMillis(expiryConfiguration.frequencyMilli),
-                PunctuationType.WALL_CLOCK_TIME, expiryPunctuator)
+        expiryCancellable = processorContext.schedule(
+            Duration.ofMillis(expiryConfiguration.frequencyMilli),
+            PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+        )
         log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
     }
 
@@ -89,10 +96,14 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         cleanPunctuator.processorContext = processorContext
         cleanPunctuator.configuration = prioritizationConfiguration
         val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
-        cleanCancellable = processorContext.schedule(Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
-                PunctuationType.WALL_CLOCK_TIME, cleanPunctuator)
-        log.info("Clean punctuator setup complete with expiry " +
-                "hold(${cleanConfiguration.expiredRecordsHoldDays})days")
+        cleanCancellable = processorContext.schedule(
+            Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+            PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+        )
+        log.info(
+            "Clean punctuator setup complete with expiry " +
+                    "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+        )
     }
 
     open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
@@ -102,25 +113,31 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
             val group = messagePrioritization.group
             val correlationId = messagePrioritization.correlationId!!
             val types = getGroupCorrelationTypes(messagePrioritization)
-            log.info("checking correlation for message($id), group($group), types($types), " +
-                    "correlation id($correlationId)")
+            log.info(
+                "checking correlation for message($id), group($group), types($types), " +
+                        "correlation id($correlationId)"
+            )
 
             /** Get all previously received messages from database for group and optional types and correlation Id */
-            val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(group,
-                    arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId)
+            val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
+                group,
+                arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+            )
 
             /** If multiple records found, then check correlation */
             if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
                 /** Check all correlation satisfies */
                 val correlationResults = MessageCorrelationUtils
-                        .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
+                    .correlatedMessagesWithTypes(waitingCorrelatedStoreMessages, types)
 
                 if (correlationResults.correlated) {
                     /** Correlation  satisfied */
                     val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
                     /**  Send only correlated ids to next processor */
-                    this.processorContext.forward(UUID.randomUUID().toString(), correlatedIds,
-                            To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+                    this.processorContext.forward(
+                        UUID.randomUUID().toString(), correlatedIds,
+                        To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+                    )
                 } else {
                     /** Correlation not satisfied */
                     log.trace("correlation not matched : ${correlationResults.message}")
@@ -135,8 +152,10 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
         } else {
             // No Correlation check needed, simply forward to next processor.
             messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
-            this.processorContext.forward(messagePrioritization.id, messagePrioritization.id,
-                    To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE))
+            this.processorContext.forward(
+                messagePrioritization.id, messagePrioritization.id,
+                To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
+            )
         }
     }
 
@@ -145,4 +164,4 @@ open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteA
     open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
         return null
     }
-}
\ No newline at end of file
+}