2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
19 import org.apache.kafka.common.serialization.Serdes
20 import org.apache.kafka.streams.Topology
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
22 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
27 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 open class MessagePrioritizationConsumer(
30 private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) {
32 private val log = logger(MessagePrioritizationConsumer::class)
34 lateinit var streamingConsumerService: BlueprintMessageConsumerService
36 open fun consumerService(selector: String): BlueprintMessageConsumerService {
37 return bluePrintMessageLibPropertyService
38 .blueprintMessageConsumerService(selector)
41 open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration)
42 : KafkaStreamConsumerFunction {
43 return object : KafkaStreamConsumerFunction {
45 override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
46 additionalConfig: Map<String, Any>?): Topology {
48 val topology = Topology()
49 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
50 as KafkaStreamsBasicAuthConsumerProperties
52 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
53 log.info("Consuming prioritization topics($topics)")
55 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
57 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
58 bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
59 prioritizationConfiguration),
60 MessagePrioritizationConstants.SOURCE_INPUT)
62 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
63 bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
64 prioritizationConfiguration),
65 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
67 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
68 bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
69 prioritizationConfiguration),
70 MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
72 topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED,
73 prioritizationConfiguration.expiredTopic,
74 Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
75 MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
77 topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
78 prioritizationConfiguration.outputTopic,
79 Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
80 MessagePrioritizationConstants.PROCESSOR_OUTPUT)
82 // Output will be sent to the group-output topic from Processor API
88 suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
89 streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
91 // Dynamic Consumer Function to create Topology
92 val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
93 streamingConsumerService.consume(null, consumerFunction)
96 suspend fun shutDown() {
97 if (streamingConsumerService != null) {
98 streamingConsumerService.shutDown()