2 * Copyright © 2019 IBM.
3 * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
20 import kotlinx.coroutines.channels.Channel
21 import org.apache.kafka.clients.consumer.Consumer
22 import org.apache.kafka.clients.consumer.ConsumerRecords
23 import org.apache.kafka.streams.Topology
24 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
25 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
27 /** Consumer Function Interfaces */
28 interface ConsumerFunction
30 interface BlueprintMessageConsumerService {
32 suspend fun subscribe(): Channel<String> {
33 return subscribe(null)
36 /** Subscribe to the Kafka channel with [additionalConfig] */
37 suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
39 /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
40 suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
42 /** Consume and execute dynamic function [consumerFunction] */
43 suspend fun consume(consumerFunction: ConsumerFunction) {
44 consume(null, consumerFunction)
47 /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
48 suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
49 throw BluePrintProcessorException("Not Implemented")
52 /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
53 suspend fun consume(topics: List<String>, additionalConfig: Map<String, Any>?,
54 consumerFunction: ConsumerFunction) {
55 throw BluePrintProcessorException("Not Implemented")
58 /** close the channel, consumer and other resources */
59 suspend fun shutDown()
61 /** Consumer dynamic implementation interface */
62 interface KafkaConsumerRecordsFunction : ConsumerFunction {
63 suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
64 consumerRecords: ConsumerRecords<*, *>)
67 interface KafkaStreamConsumerFunction : ConsumerFunction {
68 suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
69 additionalConfig: Map<String, Any>?): Topology