import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
-import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
/** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
- val clusterService = BlueprintDependencyService.optionalClusterService()
+ val clusterService = BluePrintDependencyService.optionalClusterService()
return if (clusterService != null && clusterService.clusterJoined() &&
!messagePrioritization.correlationId.isNullOrBlank()
val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
- if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)")
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
clusterLock
} else null
}
/** Utility to create the cluster lock for expiry scheduler*/
suspend fun prioritizationExpiryLock(): ClusterLock? {
- val clusterService = BlueprintDependencyService.optionalClusterService()
+ val clusterService = BluePrintDependencyService.optionalClusterService()
return if (clusterService != null && clusterService.clusterJoined()) {
val lockName = "prioritize-expiry"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
- if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)")
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
clusterLock
} else null
}
/** Utility to create the cluster lock for expiry scheduler*/
suspend fun prioritizationCleanLock(): ClusterLock? {
- val clusterService = BlueprintDependencyService.optionalClusterService()
+ val clusterService = BluePrintDependencyService.optionalClusterService()
return if (clusterService != null && clusterService.clusterJoined()) {
val lockName = "prioritize-clean"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
- if (!clusterLock.isLocked()) throw BlueprintProcessorException("failed to lock($lockName)")
+ if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
clusterLock
} else null
}
fun <K, V> bluePrintProcessorSupplier(name: String): ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
// Dynamically resolve the Prioritization Processor
- BlueprintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+ BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
}
}
}