967cc190eceda8a359a966d7898e481b54209bd7
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import org.apache.kafka.common.serialization.Serdes
20 import org.apache.kafka.streams.Topology
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29
30 open class MessagePrioritizationConsumer(
31         private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService) {
32
33     private val log = logger(MessagePrioritizationConsumer::class)
34
35     lateinit var streamingConsumerService: BlueprintMessageConsumerService
36
37     open fun consumerService(selector: String): BlueprintMessageConsumerService {
38         return bluePrintMessageLibPropertyService
39                 .blueprintMessageConsumerService(selector)
40     }
41
42     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration)
43             : KafkaStreamConsumerFunction {
44         return object : KafkaStreamConsumerFunction {
45
46             override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
47                                                 additionalConfig: Map<String, Any>?): Topology {
48
49                 val topology = Topology()
50                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
51                         as KafkaStreamsBasicAuthConsumerProperties
52
53                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
54                 log.info("Consuming prioritization topics($topics)")
55
56                 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
57
58                 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
59                         bluePrintProcessorSupplier<ByteArray, ByteArray>(MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
60                                 prioritizationConfiguration),
61                         MessagePrioritizationConstants.SOURCE_INPUT)
62
63                 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
64                         bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
65                                 prioritizationConfiguration),
66                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
67
68                 topology.addProcessor(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
69                         bluePrintProcessorSupplier<String, String>(MessagePrioritizationConstants.PROCESSOR_OUTPUT,
70                                 prioritizationConfiguration),
71                         MessagePrioritizationConstants.PROCESSOR_AGGREGATE)
72
73                 topology.addSink(MessagePrioritizationConstants.SINK_EXPIRED,
74                         prioritizationConfiguration.expiredTopic,
75                         Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
76                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
77
78                 /** To receive completed and error messages */
79                 topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
80                         prioritizationConfiguration.outputTopic,
81                         Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
82                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
83                         MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
84                         MessagePrioritizationConstants.PROCESSOR_OUTPUT)
85
86                 // Output will be sent to the group-output topic from Processor API
87                 return topology
88             }
89         }
90     }
91
92     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
93         streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
94
95         // Dynamic Consumer Function to create Topology
96         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
97         streamingConsumerService.consume(null, consumerFunction)
98     }
99
100     suspend fun shutDown() {
101         if (streamingConsumerService != null) {
102             streamingConsumerService.shutDown()
103         }
104     }
105 }