Rest endpoint for message Prioritization
[ccsdk/cds.git] / ms / blueprintsprocessor / functions / message-prioritizaion / src / main / kotlin / org / onap / ccsdk / cds / blueprintsprocessor / functions / message / prioritization / kafka / MessagePrioritizationConsumer.kt
  * limitations under the License.
  */
 
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
 
 import org.apache.kafka.common.serialization.Serdes
 import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -42,7 +43,7 @@ open class MessagePrioritizationConsumer(
     }
 
     open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
-            KafkaStreamConsumerFunction {
+        KafkaStreamConsumerFunction {
         return object : KafkaStreamConsumerFunction {
 
             override suspend fun createTopology(
@@ -52,7 +53,7 @@ open class MessagePrioritizationConsumer(
 
                 val topology = Topology()
                 val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
-                        as KafkaStreamsBasicAuthConsumerProperties
+                    as KafkaStreamsBasicAuthConsumerProperties
 
                 val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
                 log.info("Consuming prioritization topics($topics)")
@@ -68,39 +69,12 @@ open class MessagePrioritizationConsumer(
                     MessagePrioritizationConstants.SOURCE_INPUT
                 )
 
-                topology.addProcessor(
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                    bluePrintProcessorSupplier<String, String>(
-                        MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                        prioritizationConfiguration
-                    ),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
-                )
-
-                topology.addProcessor(
-                    MessagePrioritizationConstants.PROCESSOR_OUTPUT,
-                    bluePrintProcessorSupplier<String, String>(
-                        MessagePrioritizationConstants.PROCESSOR_OUTPUT,
-                        prioritizationConfiguration
-                    ),
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE
-                )
-
-                topology.addSink(
-                    MessagePrioritizationConstants.SINK_EXPIRED,
-                    prioritizationConfiguration.expiredTopic,
-                    Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
-                )
-
                 /** To receive completed and error messages */
                 topology.addSink(
                     MessagePrioritizationConstants.SINK_OUTPUT,
                     prioritizationConfiguration.outputTopic,
                     Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
-                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
-                    MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
-                    MessagePrioritizationConstants.PROCESSOR_OUTPUT
+                    MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
                 )
 
                 // Output will be sent to the group-output topic from Processor API