b611060f743d04468e87310f6a3295d37978a60f
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / MessagePrioritizationConsumer.kt
1 /*
2  * Copyright © 2018-2019 AT&T Intellectual Property.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
18
19 import org.apache.kafka.common.serialization.Serdes
20 import org.apache.kafka.streams.Topology
21 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
22 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
23 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
25 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
26 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
27 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.KafkaStreamConsumerFunction
28 import org.onap.ccsdk.cds.controllerblueprints.core.logger
29 import org.onap.ccsdk.cds.controllerblueprints.core.splitCommaAsList
30
31 open class MessagePrioritizationConsumer(
32     private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
33 ) {
34
35     private val log = logger(MessagePrioritizationConsumer::class)
36
37     lateinit var streamingConsumerService: BlueprintMessageConsumerService
38
39     open fun consumerService(selector: String): BlueprintMessageConsumerService {
40         return bluePrintMessageLibPropertyService
41             .blueprintMessageConsumerService(selector)
42     }
43
44     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
45             KafkaStreamConsumerFunction {
46         return object : KafkaStreamConsumerFunction {
47
48             override suspend fun createTopology(
49                 messageConsumerProperties: MessageConsumerProperties,
50                 additionalConfig: Map<String, Any>?
51             ): Topology {
52
53                 val topology = Topology()
54                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
55                         as KafkaStreamsBasicAuthConsumerProperties
56
57                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
58                 log.info("Consuming prioritization topics($topics)")
59
60                 topology.addSource(MessagePrioritizationConstants.SOURCE_INPUT, *topics.toTypedArray())
61
62                 topology.addProcessor(
63                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
64                     bluePrintProcessorSupplier<ByteArray, ByteArray>(
65                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
66                         prioritizationConfiguration
67                     ),
68                     MessagePrioritizationConstants.SOURCE_INPUT
69                 )
70
71                 topology.addProcessor(
72                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
73                     bluePrintProcessorSupplier<String, String>(
74                         MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
75                         prioritizationConfiguration
76                     ),
77                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
78                 )
79
80                 topology.addProcessor(
81                     MessagePrioritizationConstants.PROCESSOR_OUTPUT,
82                     bluePrintProcessorSupplier<String, String>(
83                         MessagePrioritizationConstants.PROCESSOR_OUTPUT,
84                         prioritizationConfiguration
85                     ),
86                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE
87                 )
88
89                 topology.addSink(
90                     MessagePrioritizationConstants.SINK_EXPIRED,
91                     prioritizationConfiguration.expiredTopic,
92                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
93                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
94                 )
95
96                 /** To receive completed and error messages */
97                 topology.addSink(
98                     MessagePrioritizationConstants.SINK_OUTPUT,
99                     prioritizationConfiguration.outputTopic,
100                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
101                     MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
102                     MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
103                     MessagePrioritizationConstants.PROCESSOR_OUTPUT
104                 )
105
106                 // Output will be sent to the group-output topic from Processor API
107                 return topology
108             }
109         }
110     }
111
112     suspend fun startConsuming(prioritizationConfiguration: PrioritizationConfiguration) {
113         streamingConsumerService = consumerService(prioritizationConfiguration.inputTopicSelector)
114
115         // Dynamic Consumer Function to create Topology
116         val consumerFunction = kafkaStreamConsumerFunction(prioritizationConfiguration)
117         streamingConsumerService.consume(null, consumerFunction)
118     }
119
120     suspend fun shutDown() {
121         if (streamingConsumerService != null) {
122             streamingConsumerService.shutDown()
123         }
124     }
125 }