fb7cfd110dbc3369d636bc170c1e9c350b652b48
[ccsdk/cds.git] /
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
18
19 import org.apache.kafka.common.serialization.Serdes
20 import org.apache.kafka.streams.Topology
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
23 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
28 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
29 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
30 import org.onap.ccsdk.cds.controllerblueprints.core.logger
31 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
32
33 open class MessagePrioritizationConsumer(
34     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
35 ) {
36
37     private val log = logger(MessagePrioritizationConsumer::class)
38
39     lateinit var streamingConsumerService: BlueprintMessageConsumerService
40
41     open fun consumerService(selector: String): BlueprintMessageConsumerService {
42         return bluePrintMessageLibPropertyService
43             .blueprintMessageConsumerService(selector)
44     }
45
46     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
47         KafkaStreamConsumerFunction {
48         return object : KafkaStreamConsumerFunction {
49
50             val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
51                 ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
52
53             override suspend fun createTopology(
54                 messageConsumerProperties: MessageConsumerProperties,
55                 additionalConfig: Map<String, Any>?
56             ): Topology {
57
58                 val topology = Topology()
59                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
60                     as KafkaStreamsBasicAuthConsumerProperties
61
62                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
63                 log.info("Consuming prioritization topics($topics)")
64
65                 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
66
67                 topology.addProcessor(
68                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
69                     bluePrintProcessorSupplier<ByteArray, ByteArray>(
70                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
71                         prioritizationConfiguration
72                     ),
73                     MessagePrioritizationConstants.SOURCE_INPUT
74                 )
75
76                 /** To receive completed and error messages */
77                 topology.addSink(
78                     MessagePrioritizationConstants.SINK_OUTPUT,
79                     kafkaConsumerConfiguration.outputTopic,
80                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
81                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
82                 )
83
84                 // Output will be sent to the group-output topic from Processor API
85                 return topology
86             }
87         }
88     }
89
90     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
91
92         val kafkaConsumerConfiguration = prioritizationConfiguration.kafkaConfiguration
93             ?: throw BluePrintProcessorException("failed to get kafka consumer configuration")
94
95         streamingConsumerService = consumerService(kafkaConsumerConfiguration.inputTopicSelector)
96
97         // Dynamic Consumer Function to create Topology
98         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
99         streamingConsumerService.consume(null, consumerFunction)
100     }
101
102     suspend fun shutDown() {
103         if (streamingConsumerService != null) {
104             streamingConsumerService.shutDown()
105         }
106     }
107 }