* limitations under the License.
*/
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.kafka
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.Topology
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.MessagePrioritizationConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
}
open fun kafkaStreamConsumerFunction(prioritizationConfiguration: PrioritizationConfiguration):
- KafkaStreamConsumerFunction {
+ KafkaStreamConsumerFunction {
return object : KafkaStreamConsumerFunction {
override suspend fun createTopology(
val topology = Topology()
val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
- as KafkaStreamsBasicAuthConsumerProperties
+ as KafkaStreamsBasicAuthConsumerProperties
val topics = kafkaStreamsBasicAuthConsumerProperties.topic.splitCommaAsList()
log.info("Consuming prioritization topics($topics)")
MessagePrioritizationConstants.SOURCE_INPUT
)
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
- topology.addProcessor(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- bluePrintProcessorSupplier<String, String>(
- MessagePrioritizationConstants.PROCESSOR_OUTPUT,
- prioritizationConfiguration
- ),
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE
- )
-
- topology.addSink(
- MessagePrioritizationConstants.SINK_EXPIRED,
- prioritizationConfiguration.expiredTopic,
- Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
- )
-
/** To receive completed and error messages */
topology.addSink(
MessagePrioritizationConstants.SINK_OUTPUT,
prioritizationConfiguration.outputTopic,
Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
- MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
- MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
- MessagePrioritizationConstants.PROCESSOR_OUTPUT
+ MessagePrioritizationConstants.PROCESSOR_PRIORITIZE
)
// Output will be sent to the group-output topic from Processor API