Refactored to support both Kafka and Rest endpoint prioritization.
Simplified number for Kafka processors.
Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Iba77ed94be3398940840ff01a298f0bec785401f
------------------
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-input-topic
kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-output-topic
-kafka-topics --zookeeper localhost:2181 --delete --topic prioritize-expired-topic
kafka-topics --zookeeper localhost:2181 --delete --topic test-prioritize-application-PriorityMessage-changelog
Create Topics
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-input-topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-output-topic
-kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic prioritize-expired-topic
To List topics
----------------
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-output-topic --from-beginning
kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-input-topic --from-beginning
-
-kafka-console-consumer --bootstrap-server localhost:9092 --topic prioritize-expired-topic --from-beginning
-
const val SOURCE_INPUT = "source-prioritization-input"
const val PROCESSOR_PRIORITIZE = "processor-prioritization-prioritize"
- const val PROCESSOR_AGGREGATE = "processor-prioritization-aggregate"
- const val PROCESSOR_OUTPUT = "processor-prioritization-output"
const val SINK_OUTPUT = "sink-prioritization-output"
- const val SINK_EXPIRED = "sink-prioritization-expired"
}
--- /dev/null
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+
+interface MessagePrioritizationService {
+
+ fun setKafkaProcessorContext(processorContext: ProcessorContext?)
+
+ suspend fun prioritize(messagePrioritization: MessagePrioritization)
+
+ suspend fun output(id: String)
+}
--- /dev/null
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import java.util.Date
+
+interface MessagePrioritizationStateService {
+
+ suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
+
+ suspend fun getMessage(id: String): MessagePrioritization
+
+ suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
+ suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
+ List<MessagePrioritization>?
+
+ suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
+
+ suspend fun getCorrelatedMessages(
+ group: String,
+ states: List<String>,
+ types: List<String>?,
+ correlationIds: String
+ ): List<MessagePrioritization>?
+
+ suspend fun updateMessagesState(ids: List<String>, state: String)
+
+ suspend fun updateMessageState(id: String, state: String): MessagePrioritization
+
+ suspend fun setMessageState(id: String, state: String)
+
+ suspend fun setMessagesPriority(ids: List<String>, priority: String)
+
+ suspend fun setMessagesState(ids: List<String>, state: String)
+
+ suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+ suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
+
+ suspend fun deleteMessage(id: String)
+
+ suspend fun deleteMessageByGroup(group: String)
+
+ suspend fun deleteMessageStates(group: String, states: List<String>)
+
+ suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
+}
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
import org.springframework.http.MediaType
import org.springframework.web.bind.annotation.GetMapping
@RestController
@RequestMapping(value = ["/api/v1/message-prioritization"])
-open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+open class MessagePrioritizationApi(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) {
@GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
messagePrioritizationStateService.saveMessage(messagePrioritization)
}
+ @PostMapping(
+ path = ["/prioritize"], produces = [MediaType.APPLICATION_JSON_VALUE],
+ consumes = [MediaType.APPLICATION_JSON_VALUE]
+ )
+ @ResponseBody
+ fun prioritize(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+ messagePrioritizationService.prioritize(messagePrioritization)
+ }
+
@PostMapping(
path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
consumes = [MediaType.APPLICATION_JSON_VALUE]
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.ProcessorContext
-import org.onap.ccsdk.cds.blueprintsprocessor.atomix.clusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessageProcessor
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
-/** CDS Message Prioritazation Kafka Stream Processor abstract class to implement */
+/** CDS Message Prioritization Kafka Stream Processor abstract class to implement */
abstract class AbstractMessagePrioritizeProcessor<K, V> : AbstractBluePrintMessageProcessor<K, V>() {
private val log = logger(AbstractMessagePrioritizeProcessor::class)
lateinit var prioritizationConfiguration: PrioritizationConfiguration
- lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
- var clusterService: BluePrintClusterService? = null
- override fun init(context: ProcessorContext) {
- this.processorContext = context
- /** Get the State service to update in store */
- this.messagePrioritizationStateService = BluePrintDependencyService
- .messagePrioritizationStateService()
- }
-
- /** Cluster Service is not enabled by default for all processors, In needed initialize from processor init method */
- open fun initializeClusterService() {
- /** Get the Cluster service to update in store */
- if (BluePrintConstants.CLUSTER_ENABLED) {
- this.clusterService = BluePrintDependencyService.clusterService()
- }
+ override fun init(processorContext: ProcessorContext) {
+ this.processorContext = processorContext
}
}
--- /dev/null
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
+
+import org.apache.kafka.streams.processor.Cancellable
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.processor.PunctuationType
+import org.apache.kafka.streams.processor.To
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.time.Duration
+
+open class DefaultMessagePrioritizeProcessor(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService,
+ private val messagePrioritizationService: MessagePrioritizationService
+) : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ lateinit var expiryCancellable: Cancellable
+ lateinit var cleanCancellable: Cancellable
+
+ override suspend fun processNB(key: ByteArray, value: ByteArray) {
+
+ val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+ ?: throw BluePrintProcessorException("failed to convert")
+ try {
+ messagePrioritizationService.setKafkaProcessorContext(processorContext)
+ messagePrioritizationService.prioritize(messagePrioritize)
+ } catch (e: Exception) {
+ messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+ log.error(messagePrioritize.error)
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritize.id, MessageState.ERROR.name,
+ messagePrioritize.error!!
+ )
+ /** Publish to Output topic */
+ this.processorContext.forward(
+ messagePrioritize.id, messagePrioritize,
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
+ )
+ }
+ }
+
+ override fun init(context: ProcessorContext) {
+ super.init(context)
+ /** set up expiry marking cron */
+ initializeExpiryPunctuator()
+ /** Set up cleaning records cron */
+ initializeCleanPunctuator()
+ }
+
+ override fun close() {
+ log.info(
+ "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
+ "taskId(${processorContext.taskId()})"
+ )
+ expiryCancellable.cancel()
+ cleanCancellable.cancel()
+ }
+
+ open fun initializeExpiryPunctuator() {
+ val expiryPunctuator =
+ MessagePriorityExpiryPunctuator(
+ messagePrioritizationStateService
+ )
+ expiryPunctuator.processorContext = processorContext
+ expiryPunctuator.configuration = prioritizationConfiguration
+ val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
+ expiryCancellable = processorContext.schedule(
+ Duration.ofMillis(expiryConfiguration.frequencyMilli),
+ PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
+ )
+ log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
+ }
+
+ open fun initializeCleanPunctuator() {
+ val cleanPunctuator =
+ MessagePriorityCleanPunctuator(
+ messagePrioritizationStateService
+ )
+ cleanPunctuator.processorContext = processorContext
+ cleanPunctuator.configuration = prioritizationConfiguration
+ val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
+ cleanCancellable = processorContext.schedule(
+ Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
+ PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
+ )
+ log.info(
+ "Clean punctuator setup complete with expiry " +
+ "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
+ )
+ }
+}
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
}
open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
- KafkaStreamConsumerFunction {
+ KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
override suspend fun createTopology(
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
MessagePrioritizationConstants.SOURCE_INPUT
)
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE
- )
-
- topology.addSink(
- MessagePrioritizationConstants.SINK_EXPIRED,
- prioritizationConfiguration.expiredTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
prioritizationConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- MessagePrioritizationConstants.PROCESSOR_OUTPUT
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
// Output will be sent to the group-output topic from Processor API
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.streams.processor.To
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.message.kafka.AbstractBluePrintMessagePunctuator
import org.onap.ccsdk.cds.controllerblueprints.core.logger
fetchMessages.forEach { expired ->
processorContext.forward(
expired.id, expired,
- To.child(MessagePrioritizationConstants.SINK_EXPIRED)
+ To.child(MessagePrioritizationConstants.SINK_OUTPUT)
)
}
}
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.serialization.Serde
* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
-import org.apache.kafka.streams.processor.Cancellable
import org.apache.kafka.streams.processor.ProcessorContext
-import org.apache.kafka.streams.processor.PunctuationType
import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageCorrelationUtils
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils
-import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
-import java.time.Duration
-import java.util.UUID
-open class MessagePrioritizeProcessor : AbstractMessagePrioritizeProcessor<ByteArray, ByteArray>() {
+/** Child should implement with sequencing & aggregation handling along with group type correlation mappings.*/
+abstract class AbstractMessagePrioritizationService(
+ private val messagePrioritizationStateService: MessagePrioritizationStateService
+) : MessagePrioritizationService {
- private val log = logger(MessagePrioritizeProcessor::class)
+ private val log = logger(AbstractMessagePrioritizationService::class)
- lateinit var expiryCancellable: Cancellable
- lateinit var cleanCancellable: Cancellable
+ var processorContext: ProcessorContext? = null
- override suspend fun processNB(key: ByteArray, value: ByteArray) {
- log.info("***** received in prioritize processor key(${String(key)})")
- val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
- ?: throw BluePrintProcessorException("failed to convert")
+ override fun setKafkaProcessorContext(processorContext: ProcessorContext?) {
+ this.processorContext = processorContext
+ }
+
+ override suspend fun prioritize(messagePrioritize: MessagePrioritization) {
try {
+ log.info("***** received in prioritize processor key(${messagePrioritize.id})")
/** Get the cluster lock for message group */
- val clusterLock = MessageProcessorUtils.prioritizationGrouplock(clusterService, messagePrioritize)
+ val clusterLock = MessageProcessorUtils.prioritizationGrouplock(messagePrioritize)
// Save the Message
messagePrioritizationStateService.saveMessage(messagePrioritize)
handleCorrelationAndNextStep(messagePrioritize)
/** Cluster unLock for message group */
- MessageProcessorUtils.prioritizationGroupUnLock(clusterService, clusterLock)
+ MessageProcessorUtils.prioritizationGroupUnLock(clusterLock)
} catch (e: Exception) {
messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
log.error(messagePrioritize.error)
messagePrioritize.id, MessageState.ERROR.name,
messagePrioritize.error!!
)
- /** Publish to Output topic */
- this.processorContext.forward(
- messagePrioritize.id, messagePrioritize,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
}
}
- override fun init(context: ProcessorContext) {
- super.init(context)
- /** set up expiry marking cron */
- initializeExpiryPunctuator()
- /** Set up cleaning records cron */
- initializeCleanPunctuator()
- /** Set up Cluster Service */
- initializeClusterService()
- }
-
- override fun close() {
- log.info(
- "closing prioritization processor applicationId(${processorContext.applicationId()}), " +
- "taskId(${processorContext.taskId()})"
- )
- expiryCancellable.cancel()
- cleanCancellable.cancel()
- }
-
- open fun initializeExpiryPunctuator() {
- val expiryPunctuator = MessagePriorityExpiryPunctuator(messagePrioritizationStateService)
- expiryPunctuator.processorContext = processorContext
- expiryPunctuator.configuration = prioritizationConfiguration
- val expiryConfiguration = prioritizationConfiguration.expiryConfiguration
- expiryCancellable = processorContext.schedule(
- Duration.ofMillis(expiryConfiguration.frequencyMilli),
- PunctuationType.WALL_CLOCK_TIME, expiryPunctuator
- )
- log.info("Expiry punctuator setup complete with frequency(${expiryConfiguration.frequencyMilli})mSec")
- }
-
- open fun initializeCleanPunctuator() {
- val cleanPunctuator = MessagePriorityCleanPunctuator(messagePrioritizationStateService)
- cleanPunctuator.processorContext = processorContext
- cleanPunctuator.configuration = prioritizationConfiguration
- val cleanConfiguration = prioritizationConfiguration.cleanConfiguration
- cleanCancellable = processorContext.schedule(
- Duration.ofDays(cleanConfiguration.expiredRecordsHoldDays.toLong()),
- PunctuationType.WALL_CLOCK_TIME, cleanPunctuator
- )
- log.info(
- "Clean punctuator setup complete with expiry " +
- "hold(${cleanConfiguration.expiredRecordsHoldDays})days"
- )
+ override suspend fun output(id: String) {
+ log.info("$$$$$ received in output processor id($id)")
+ val message = messagePrioritizationStateService.updateMessageState(id, MessageState.COMPLETED.name)
+ /** Check for Kafka Processing, If yes, then send to the output topic */
+ if (this.processorContext != null) {
+ processorContext!!.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+ }
}
open suspend fun handleCorrelationAndNextStep(messagePrioritization: MessagePrioritization) {
)
/** Get all previously received messages from database for group and optional types and correlation Id */
- val waitingCorrelatedStoreMessages = messagePrioritizationStateService.getCorrelatedMessages(
- group,
- arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
- )
+ val waitingCorrelatedStoreMessages = messagePrioritizationStateService
+ .getCorrelatedMessages(
+ group,
+ arrayListOf(MessageState.NEW.name, MessageState.WAIT.name), types, correlationId
+ )
/** If multiple records found, then check correlation */
if (!waitingCorrelatedStoreMessages.isNullOrEmpty() && waitingCorrelatedStoreMessages.size > 1) {
if (correlationResults.correlated) {
/** Correlation satisfied */
- val correlatedIds = waitingCorrelatedStoreMessages.map { it.id }.joinToString(",")
- /** Send only correlated ids to next processor */
- this.processorContext.forward(
- UUID.randomUUID().toString(), correlatedIds,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ val correlatedIds = waitingCorrelatedStoreMessages.joinToString(",") { it.id }
+ /** Send only correlated ids to aggregate processor */
+ aggregate(correlatedIds)
} else {
/** Correlation not satisfied */
log.trace("correlation not matched : ${correlationResults.message}")
} else {
// No Correlation check needed, simply forward to next processor.
messagePrioritizationStateService.setMessageState(messagePrioritization.id, MessageState.PRIORITIZED.name)
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization.id,
- To.child(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
- )
+ aggregate(messagePrioritization.id)
+ }
+ }
+
+ open suspend fun aggregate(strIds: String) {
+ log.info("@@@@@ received in aggregation processor ids($strIds)")
+ val ids = strIds.split(",").map { it.trim() }
+ if (!ids.isNullOrEmpty()) {
+ try {
+ if (ids.size == 1) {
+ /** No aggregation or sequencing needed, simpley forward to next processor */
+ output(ids.first())
+ } else {
+ /** Implement Aggregation logic in overridden class, If necessary,
+ Populate New Message and Update status with Prioritized, Forward the message to next processor */
+ handleAggregation(ids)
+ /** Update all messages to Aggregated state */
+ messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+ }
+ } catch (e: Exception) {
+ val error = "failed in Aggregate message($ids) : ${e.message}"
+ log.error(error, e)
+ val storeMessages = messagePrioritizationStateService.getMessages(ids)
+ if (!storeMessages.isNullOrEmpty()) {
+ storeMessages.forEach { messagePrioritization ->
+ try {
+ /** Update the data store */
+ messagePrioritizationStateService.setMessageStateANdError(
+ messagePrioritization.id,
+ MessageState.ERROR.name, error
+ )
+ /** Publish to output topic */
+ output(messagePrioritization.id)
+ } catch (sendException: Exception) {
+ log.error(
+ "failed to update/publish error message(${messagePrioritization.id}) : " +
+ "${sendException.message}", e
+ )
+ }
+ }
+ }
+ }
}
}
+ /** Child will override this implementation , if necessary
+ * Here the place child has to implement custom Sequencing and Aggregation logic.
+ * */
+ abstract suspend fun handleAggregation(messageIds: List<String>)
+
/** If consumer wants specific correlation with respect to group and types, then populate the specific types,
* otherwise correlation happens with group and correlationId */
- open fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return null
- }
+ abstract fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>?
}
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
import org.springframework.transaction.annotation.Transactional
import java.util.Date
-interface MessagePrioritizationStateService {
-
- suspend fun saveMessage(message: MessagePrioritization): MessagePrioritization
-
- suspend fun getMessage(id: String): MessagePrioritization
-
- suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
-
- suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
-
- suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>?
-
- suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
-
- suspend fun getCorrelatedMessages(
- group: String,
- states: List<String>,
- types: List<String>?,
- correlationIds: String
- ): List<MessagePrioritization>?
-
- suspend fun updateMessagesState(ids: List<String>, state: String)
-
- suspend fun updateMessageState(id: String, state: String): MessagePrioritization
-
- suspend fun setMessageState(id: String, state: String)
-
- suspend fun setMessagesPriority(ids: List<String>, priority: String)
-
- suspend fun setMessagesState(ids: List<String>, state: String)
-
- suspend fun setMessageStateANdError(id: String, state: String, error: String)
-
- suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
-
- suspend fun deleteMessage(id: String)
-
- suspend fun deleteMessageByGroup(group: String)
-
- suspend fun deleteMessageStates(group: String, states: List<String>)
-
- suspend fun deleteExpiredMessage(group: String, retentionDays: Int)
-}
-
@Service
open class MessagePrioritizationStateServiceImpl(
private val prioritizationMessageRepository: PrioritizationMessageRepository
-) :
- MessagePrioritizationStateService {
+) : MessagePrioritizationStateService {
private val log = logger(MessagePrioritizationStateServiceImpl::class)
}
override suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndNotExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
}
override suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByGroupAndStateInAndExpiredDate(
group,
states, Date(), PageRequest.of(0, count)
}
override suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int):
- List<MessagePrioritization>? {
+ List<MessagePrioritization>? {
return prioritizationMessageRepository.findByByGroupAndExpiredDate(
group,
expiryDate, PageRequest.of(0, count)
--- /dev/null
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+
+open class SampleMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ AbstractMessagePrioritizationService(messagePrioritizationStateService) {
+
+ private val log = logger(DefaultMessagePrioritizeProcessor::class)
+
+ /** Child overriding this implementation , if necessary */
+ override suspend fun handleAggregation(messageIds: List<String>) {
+ log.info("messages($messageIds) aggregated")
+ messageIds.forEach { id ->
+ output(id)
+ }
+ }
+
+ /** If consumer wants specific correlation with respect to group and types, then populate the specific types,
+ * otherwise correlation happens with group and correlationId */
+ override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
+ return when (messagePrioritization.group) {
+ /** Dummy Implementation, This can also be read from file and stored as cached map **/
+ "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
+ else -> null
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageAggregateProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageAggregateProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
-
- log.info("@@@@@ received in aggregation processor key($key), value($value)")
- val ids = value.split(",").map { it.trim() }
- if (!ids.isNullOrEmpty()) {
- try {
- if (ids.size == 1) {
- processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- } else {
- /** Implement Aggregation logic in overridden class, If necessary,
- Populate New Message and Update status with Prioritized, Forward the message to next processor */
- handleAggregation(ids)
- /** Update all messages to Aggregated state */
- messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
- }
- } catch (e: Exception) {
- val error = "failed in Aggregate message($ids) : ${e.message}"
- log.error(error, e)
- val storeMessages = messagePrioritizationStateService.getMessages(ids)
- if (!storeMessages.isNullOrEmpty()) {
- storeMessages.forEach { messagePrioritization ->
- try {
- /** Update the data store */
- messagePrioritizationStateService.setMessageStateANdError(
- messagePrioritization.id,
- MessageState.ERROR.name, error
- )
- /** Publish to Error topic */
- this.processorContext.forward(
- messagePrioritization.id, messagePrioritization,
- To.child(MessagePrioritizationConstants.SINK_OUTPUT)
- )
- } catch (sendException: Exception) {
- log.error(
- "failed to update/publish error message(${messagePrioritization.id}) : " +
- "${sendException.message}", e
- )
- }
- }
- }
- }
- }
- }
-
- /** Child will override this implementation , if necessary */
- open suspend fun handleAggregation(messageIds: List<String>) {
- log.info("messages($messageIds) aggregated")
- messageIds.forEach { id ->
- processorContext.forward(id, id, To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
- }
- }
-}
+++ /dev/null
-/*
- * Copyright © 2018-2019 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology
-
-import org.apache.kafka.streams.processor.To
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessageState
-import org.onap.ccsdk.cds.controllerblueprints.core.logger
-
-open class MessageOutputProcessor : AbstractMessagePrioritizeProcessor<String, String>() {
-
- private val log = logger(MessageOutputProcessor::class)
-
- override suspend fun processNB(key: String, value: String) {
- log.info("$$$$$ received in output processor key($key), value($value)")
- val message = messagePrioritizationStateService.updateMessageState(value, MessageState.COMPLETED.name)
- processorContext.forward(message.id, message, To.child(MessagePrioritizationConstants.SINK_OUTPUT))
- }
-}
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
import org.apache.kafka.streams.processor.ProcessorSupplier
-import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.blueprintsprocessor.atomix.optionalClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.ClusterLock
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.AbstractMessagePrioritizeProcessor
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.toFormatedCorrelation
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
object MessageProcessorUtils {
- /** Utility to create the cluster lock for message [messagePrioritization] */
- suspend fun prioritizationGrouplock(
- clusterService: BluePrintClusterService?,
- messagePrioritization: MessagePrioritization
- ): ClusterLock? {
+ /** Utility to create the cluster lock for message [messagePrioritization] prioritization procssing.*/
+ suspend fun prioritizationGrouplock(messagePrioritization: MessagePrioritization): ClusterLock? {
+ val clusterService = BluePrintDependencyService.optionalClusterService()
+
return if (clusterService != null && clusterService.clusterJoined() &&
!messagePrioritization.correlationId.isNullOrBlank()
) {
// Get the correlation key in ascending order, even it it is misplaced
val correlationId = messagePrioritization.toFormatedCorrelation()
- val lockName = "prioritization-${messagePrioritization.group}-$correlationId"
+ val lockName = "prioritize::${messagePrioritization.group}::$correlationId"
val clusterLock = clusterService.clusterLock(lockName)
clusterLock.lock()
if (!clusterLock.isLocked()) throw BluePrintProcessorException("failed to lock($lockName)")
} else null
}
- /** Utility used to cluster unlock for message [messagePrioritization] */
- suspend fun prioritizationGroupUnLock(clusterService: BluePrintClusterService?, clusterLock: ClusterLock?) {
- if (clusterService != null && clusterService.clusterJoined() && clusterLock != null) {
+ /** Utility used to cluster unlock for message [clusterLock] */
+ suspend fun prioritizationGroupUnLock(clusterLock: ClusterLock?) {
+ if (clusterLock != null) {
clusterLock.unLock()
clusterLock.close()
}
}
+ /** Get the Kafka Supplier for processor lookup [name] and [prioritizationConfiguration] **/
fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration):
ProcessorSupplier<K, V> {
return ProcessorSupplier<K, V> {
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertiesService
import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintPropertyConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.PrioritizationMessageRepository
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessagePrioritizationSample
import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaBasicAuthMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
import org.springframework.test.context.junit4.SpringRunner
import kotlin.test.Test
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
@RunWith(SpringRunner::class)
@DataJpaTest
)
open class MessagePrioritizationConsumerTest {
+ private val log = logger(MessagePrioritizationConsumerTest::class)
+
@Autowired
lateinit var applicationContext: ApplicationContext
@Autowired
lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+ @Autowired
+ lateinit var messagePrioritizationService: MessagePrioritizationService
+
@Autowired
lateinit var messagePrioritizationConsumer: MessagePrioritizationConsumer
}
}
+ @Test
+ fun testMessagePrioritizationService() {
+ runBlocking {
+ assertTrue(
+ ::messagePrioritizationService.isInitialized,
+ "failed to initialize messagePrioritizationService"
+ )
+
+ log.info("**************** without Correlation **************")
+ /** Checking without correlation */
+ MessagePrioritizationSample.sampleMessages(MessageState.NEW.name, 2).forEach {
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Same Group , with Correlation **************")
+ /** checking same group with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithSameCorrelation("same-group", MessageState.NEW.name, 2)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ log.info("**************** Different Type , with Correlation **************")
+ /** checking different type, with correlation */
+ MessagePrioritizationSample
+ .sampleMessageWithDifferentTypeSameCorrelation("group-typed", MessageState.NEW.name, 3)
+ .forEach {
+ delay(10)
+ messagePrioritizationService.prioritize(it)
+ }
+ }
+ }
+
@Test
fun testStartConsuming() {
runBlocking {
val spyStreamingConsumerService = spyk(streamingConsumerService)
coEvery { spyStreamingConsumerService.consume(any(), any()) } returns Unit
coEvery { spyStreamingConsumerService.shutDown() } returns Unit
- val messagePrioritizationConsumer = MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
+ val messagePrioritizationConsumer = MessagePrioritizationConsumer(
+ bluePrintMessageLibPropertyService
+ )
val spyMessagePrioritizationConsumer = spyk(messagePrioritizationConsumer)
// Test Topology
package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
import org.onap.ccsdk.cds.blueprintsprocessor.db.PrimaryDBLibGenericService
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageAggregateProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessageOutputProcessor
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.DefaultMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka.MessagePrioritizationConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.SampleMessagePrioritizationService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
*/
@Service
-open class SampleMessagePrioritizationConsumer(
+open class TestMessagePrioritizationService(messagePrioritizationStateService: MessagePrioritizationStateService) :
+ SampleMessagePrioritizationService(messagePrioritizationStateService)
+
+/** For Kafka Consumer **/
+@Service
+open class TestMessagePrioritizationConsumer(
bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
) : MessagePrioritizationConsumer(bluePrintMessageLibPropertyService)
@Service(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
-open class SampleMessagePrioritizeProcessor : MessagePrioritizeProcessor() {
- override fun getGroupCorrelationTypes(messagePrioritization: MessagePrioritization): List<String>? {
- return when (messagePrioritization.group) {
- "group-typed" -> arrayListOf("type-0", "type-1", "type-2")
- else -> null
- }
- }
-}
-
-@Service(MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
-open class SampleMessageAggregateProcessor() : MessageAggregateProcessor()
-
-@Service(MessagePrioritizationConstants.PROCESSOR_OUTPUT)
-open class SampleMessageOutputProcessor : MessageOutputProcessor()
+open class TestMessagePrioritizeProcessor(
+ messagePrioritizationStateService: MessagePrioritizationStateService,
+ messagePrioritizationService: MessagePrioritizationService
+) : DefaultMessagePrioritizeProcessor(messagePrioritizationStateService, messagePrioritizationService)
import org.onap.ccsdk.cds.blueprintsprocessor.atomix.service.AtomixBluePrintClusterService
import org.onap.ccsdk.cds.blueprintsprocessor.core.service.BluePrintClusterService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
import org.springframework.context.annotation.Configuration
*/
fun BluePrintDependencyService.clusterService(): BluePrintClusterService =
instance(AtomixBluePrintClusterService::class)
+
+/** Optional Cluster Service, returns only if Cluster is enabled */
+fun BluePrintDependencyService.optionalClusterService(): BluePrintClusterService? {
+ return if (BluePrintConstants.CLUSTER_ENABLED) {
+ BluePrintDependencyService.clusterService()
+ } else null
+}