/*
* Copyright © 2019 IBM.
+ * Modifications Copyright © 2018-2019 AT&T Intellectual Property.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.streams.Topology
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+/** Consumer Function Interfaces */
+interface ConsumerFunction
interface BlueprintMessageConsumerService {
+ suspend fun subscribe(): Channel<String> {
+ return subscribe(null)
+ }
+
/** Subscribe to the Kafka channel with [additionalConfig] */
suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
/** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+ /** Consume and execute dynamic function [consumerFunction] */
+ suspend fun consume(consumerFunction: ConsumerFunction) {
+ consume(null, consumerFunction)
+ }
+
+ /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
+ /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+ suspend fun consume(
+ topics: List<String>,
+ additionalConfig: Map<String, Any>?,
+ consumerFunction: ConsumerFunction
+ ) {
+ throw BluePrintProcessorException("Not Implemented")
+ }
+
/** close the channel, consumer and other resources */
suspend fun shutDown()
+}
+
+/** Consumer dynamic implementation interface */
+interface KafkaConsumerRecordsFunction : ConsumerFunction {
+
+ suspend fun invoke(
+ messageConsumerProperties: MessageConsumerProperties,
+ consumer: Consumer<*, *>,
+ consumerRecords: ConsumerRecords<*, *>
+ )
+}
-}
\ No newline at end of file
+interface KafkaStreamConsumerFunction : ConsumerFunction {
+ suspend fun createTopology(
+ messageConsumerProperties: MessageConsumerProperties,
+ additionalConfig: Map<String, Any>?
+ ): Topology
+}