Formatting Code base with ktlint
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumer.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import org.apache.kafka.common.serialization.Serdes
20 import org.apache.kafka.streams.Topology
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29
30 open class MessagePrioritizationConsumer(
31     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
32 ) {
33
34     private val log = logger(MessagePrioritizationConsumer::class)
35
36     lateinit var streamingConsumerService: BlueprintMessageConsumerService
37
38     open fun consumerService(selector: String): BlueprintMessageConsumerService {
39         return bluePrintMessageLibPropertyService
40             .blueprintMessageConsumerService(selector)
41     }
42
43     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
44             KafkaStreamConsumerFunction {
45         return object : KafkaStreamConsumerFunction {
46
47             override suspend fun createTopology(
48                 messageConsumerProperties: MessageConsumerProperties,
49                 additionalConfig: Map<String, Any>?
50             ): Topology {
51
52                 val topology = Topology()
53                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
54                         as KafkaStreamsBasicAuthConsumerProperties
55
56                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
57                 log.info("Consuming prioritization topics($topics)")
58
59                 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
60
61                 topology.addProcessor(
62                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
63                     bluePrintProcessorSupplier<ByteArray, ByteArray>(
64                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
65                         prioritizationConfiguration
66                     ),
67                     MessagePrioritizationConstants.SOURCE_INPUT
68                 )
69
70                 topology.addProcessor(
71                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
72                     bluePrintProcessorSupplier<String, String>(
73                         MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
74                         prioritizationConfiguration
75                     ),
76                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
77                 )
78
79                 topology.addProcessor(
80                     MessagePrioritizationConstants.PROCESSOR_OUTPUT,
81                     bluePrintProcessorSupplier<String, String>(
82                         MessagePrioritizationConstants.PROCESSOR_OUTPUT,
83                         prioritizationConfiguration
84                     ),
85                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE
86                 )
87
88                 topology.addSink(
89                     MessagePrioritizationConstants.SINK_EXPIRED,
90                     prioritizationConfiguration.expiredTopic,
91                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
92                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
93                 )
94
95                 /** To receive completed and error messages */
96                 topology.addSink(
97                     MessagePrioritizationConstants.SINK_OUTPUT,
98                     prioritizationConfiguration.outputTopic,
99                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
100                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
101                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
102                     MessagePrioritizationConstants.PROCESSOR_OUTPUT
103                 )
104
105                 // Output will be sent to the group-output topic from Processor API
106                 return topology
107             }
108         }
109     }
110
111     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
112         streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
113
114         // Dynamic Consumer Function to create Topology
115         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
116         streamingConsumerService.consume(null, consumerFunction)
117     }
118
119     suspend fun shutDown() {
120         if (streamingConsumerService != null) {
121             streamingConsumerService.shutDown()
122         }
123     }
124 }