Formatting Code base with ktlint
[ccsdk/cds.git] / ms / blueprintsprocessor / modules / commons / message-lib / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / message / service / BlueprintMessageConsumerService.kt
index 25f0bf4..f74abcd 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.streams.Topology
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+
+/** Consumer Function Interfaces */
+interface ConsumerFunction
 
 interface BlueprintMessageConsumerService {
 
+    suspend fun subscribe(): Channel<String> {
+        return subscribe(null)
+    }
+
     /** Subscribe to the Kafka channel with [additionalConfig] */
     suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
 
     /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
     suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
 
+    /** Consume and execute dynamic function [consumerFunction] */
+    suspend fun consume(consumerFunction: ConsumerFunction) {
+        consume(null, consumerFunction)
+    }
+
+    /** Consume with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+    suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+        throw BluePrintProcessorException("Not Implemented")
+    }
+
+    /** Consume the [topics] with [additionalConfig], so that we can execute dynamic function [consumerFunction] */
+    suspend fun consume(
+        topics: List<String>,
+        additionalConfig: Map<String, Any>?,
+        consumerFunction: ConsumerFunction
+    ) {
+        throw BluePrintProcessorException("Not Implemented")
+    }
+
     /** close the channel, consumer and other resources */
     suspend fun shutDown()
+}
+
+/** Consumer dynamic implementation interface */
+interface KafkaConsumerRecordsFunction : ConsumerFunction {
+
+    suspend fun invoke(
+        messageConsumerProperties: MessageConsumerProperties,
+        consumer: Consumer<*, *>,
+        consumerRecords: ConsumerRecords<*, *>
+    )
+}
 
-}
\ No newline at end of file
+interface KafkaStreamConsumerFunction : ConsumerFunction {
+    suspend fun createTopology(
+        messageConsumerProperties: MessageConsumerProperties,
+        additionalConfig: Map<String, Any>?
+    ): Topology
+}