Add Kafka Streams consumer service 16/98216/2
authorBrinda Santh <bs2796@att.com>
Fri, 8 Nov 2019 21:41:07 +0000 (16:41 -0500)
committerBrinda Santh <bs2796@att.com>
Fri, 8 Nov 2019 22:07:32 +0000 (17:07 -0500)
Issue-ID: CCSDK-1914
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I8d2b51c66e1304decadbb55656fe8a0b4c018feb

ms/blueprintsprocessor/modules/commons/message-lib/pom.xml
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibConfiguration.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BluePrintMessageLibPropertyService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageProducerService.kt
ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt [new file with mode: 0644]
ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt [new file with mode: 0644]

index f92a8f4..8d08ae8 100644 (file)
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams-test-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.springframework.kafka</groupId>
             <artifactId>spring-kafka-test</artifactId>
index 27a444b..ecffa28 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -62,5 +63,6 @@ class MessageLibConstants {
         const val PROPERTY_MESSAGE_CONSUMER_PREFIX = "blueprintsprocessor.messageconsumer."
         const val PROPERTY_MESSAGE_PRODUCER_PREFIX = "blueprintsprocessor.messageproducer."
         const val TYPE_KAFKA_BASIC_AUTH = "kafka-basic-auth"
+        const val TYPE_KAFKA_STREAMS_BASIC_AUTH = "kafka-streams-basic-auth"
     }
 }
\ No newline at end of file
index 184e85b..d0c3d5a 100644 (file)
@@ -17,6 +17,8 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message
 
+import org.apache.kafka.streams.StreamsConfig
+
 /** Producer Properties **/
 open class MessageProducerProperties
 
@@ -25,12 +27,27 @@ open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties()
     lateinit var bootstrapServers: String
     var topic: String? = null
     var clientId: String? = null
+    // strongest producing guarantee
+    var acks: String = "all"
+    var retries: Int = 0
+    // ensure we don't push duplicates
+    var enableIdempotence: Boolean = true
 }
 
 /** Consumer Properties **/
 
 open class MessageConsumerProperties
 
+open class KafkaStreamsConsumerProperties : MessageConsumerProperties() {
+    lateinit var bootstrapServers: String
+    lateinit var applicationId: String
+    lateinit var topic: String
+    var autoOffsetReset: String = "latest"
+    var processingGuarantee: String = StreamsConfig.EXACTLY_ONCE
+}
+
+open class KafkaStreamsBasicAuthConsumerProperties : KafkaStreamsConsumerProperties()
+
 open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
     lateinit var bootstrapServers: String
     lateinit var groupId: String
index 7c56ea4..563da94 100644 (file)
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -101,6 +102,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 kafkaBasicAuthMessageConsumerProperties(prefix)
             }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+                kafkaStreamsBasicAuthMessageConsumerProperties(prefix)
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -113,6 +117,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
             MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                 JacksonUtils.readValue(jsonNode, KafkaBasicAuthMessageConsumerProperties::class.java)!!
             }
+            MessageLibConstants.TYPE_KAFKA_STREAMS_BASIC_AUTH -> {
+                JacksonUtils.readValue(jsonNode, KafkaStreamsBasicAuthConsumerProperties::class.java)!!
+            }
             else -> {
                 throw BluePrintProcessorException("Message adaptor($type) is not supported")
             }
@@ -126,6 +133,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
             is KafkaBasicAuthMessageConsumerProperties -> {
                 return KafkaBasicAuthMessageConsumerService(messageConsumerProperties)
             }
+            is KafkaStreamsBasicAuthConsumerProperties -> {
+                return KafkaStreamsBasicAuthConsumerService(messageConsumerProperties)
+            }
             else -> {
                 throw BluePrintProcessorException("couldn't get Message client service for")
             }
@@ -137,4 +147,9 @@ open class BluePrintMessageLibPropertyService(private var bluePrintProperties: B
                 prefix, KafkaBasicAuthMessageConsumerProperties::class.java)
     }
 
+    private fun kafkaStreamsBasicAuthMessageConsumerProperties(prefix: String): KafkaStreamsBasicAuthConsumerProperties {
+        return bluePrintProperties.propertyBeanType(
+                prefix, KafkaStreamsBasicAuthConsumerProperties::class.java)
+    }
+
 }
index 8bcc758..716fda6 100644 (file)
@@ -20,6 +20,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 import kotlinx.coroutines.channels.Channel
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerRecords
+import org.apache.kafka.streams.Topology
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
 
@@ -61,4 +62,9 @@ interface BlueprintMessageConsumerService {
 interface KafkaConsumerRecordsFunction : ConsumerFunction {
     suspend fun invoke(messageConsumerProperties: MessageConsumerProperties, consumer: Consumer<*, *>,
                        consumerRecords: ConsumerRecords<*, *>)
+}
+
+interface KafkaStreamConsumerFunction : ConsumerFunction {
+    suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+                               additionalConfig: Map<String, Any>?): Topology
 }
\ No newline at end of file
index 42adcd7..ad9a594 100644 (file)
@@ -65,9 +65,9 @@ class KafkaBasicAuthMessageProducerService(
             headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
         val callback = Callback { metadata, exception ->
-            log.info("message published offset(${metadata.offset()}, headers :$headers )")
+            log.trace("message published to(${metadata.topic()}), offset(${metadata.offset()}), headers :$headers")
         }
-        messageTemplate().send(record, callback).get()
+        messageTemplate().send(record, callback)
         return true
     }
 
@@ -77,6 +77,8 @@ class KafkaBasicAuthMessageProducerService(
         configProps[BOOTSTRAP_SERVERS_CONFIG] = messageProducerProperties.bootstrapServers
         configProps[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
         configProps[VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
+        configProps[ACKS_CONFIG] = messageProducerProperties.acks
+        configProps[ENABLE_IDEMPOTENCE_CONFIG] = messageProducerProperties.enableIdempotence
         if (messageProducerProperties.clientId != null) {
             configProps[CLIENT_ID_CONFIG] = messageProducerProperties.clientId!!
         }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerService.kt
new file mode 100644 (file)
index 0000000..229e462
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.streams.KafkaStreams
+import org.apache.kafka.streams.StreamsConfig
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import java.util.*
+
+open class KafkaStreamsBasicAuthConsumerService(private val messageConsumerProperties: KafkaStreamsBasicAuthConsumerProperties)
+    : BlueprintMessageConsumerService {
+
+    val log = logger(KafkaStreamsBasicAuthConsumerService::class)
+    lateinit var kafkaStreams: KafkaStreams
+
+    private fun streamsConfig(additionalConfig: Map<String, Any>? = null): Properties {
+        val configProperties = Properties()
+        configProperties[StreamsConfig.APPLICATION_ID_CONFIG] = messageConsumerProperties.applicationId
+        configProperties[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = messageConsumerProperties.bootstrapServers
+        configProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = messageConsumerProperties.autoOffsetReset
+        configProperties[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = messageConsumerProperties.processingGuarantee
+        // TODO("Security Implementation based on type")
+        /** add or override already set properties */
+        additionalConfig?.let { configProperties.putAll(it) }
+        /** Create Kafka consumer */
+        return configProperties
+    }
+
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+        throw BluePrintProcessorException("not implemented")
+    }
+
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+        throw BluePrintProcessorException("not implemented")
+    }
+
+    override suspend fun consume(additionalConfig: Map<String, Any>?, consumerFunction: ConsumerFunction) {
+        val streamsConfig = streamsConfig(additionalConfig)
+        val kafkaStreamConsumerFunction = consumerFunction as KafkaStreamConsumerFunction
+        val topology = kafkaStreamConsumerFunction.createTopology(messageConsumerProperties, additionalConfig)
+        kafkaStreams = KafkaStreams(topology, streamsConfig)
+        kafkaStreams.cleanUp()
+        kafkaStreams.start()
+        kafkaStreams.localThreadsMetadata().forEach { data -> log.info("Topology : $data") }
+    }
+
+    override suspend fun shutDown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close()
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsBasicAuthConsumerServiceTest.kt
new file mode 100644 (file)
index 0000000..e2a31f4
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.common.serialization.Serdes
+import org.apache.kafka.streams.Topology
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.apache.kafka.streams.state.Stores
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BluePrintProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertyConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BluePrintMessageLibConfiguration
+import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.test.annotation.DirtiesContext
+import org.springframework.test.context.ContextConfiguration
+import org.springframework.test.context.TestPropertySource
+import org.springframework.test.context.junit4.SpringRunner
+import kotlin.test.assertNotNull
+
+
+@RunWith(SpringRunner::class)
+@DirtiesContext
+@ContextConfiguration(classes = [BluePrintMessageLibConfiguration::class,
+    BlueprintPropertyConfiguration::class, BluePrintProperties::class])
+@TestPropertySource(properties =
+[
+    "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
+    "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageproducer.sample.topic=default-stream-topic",
+    "blueprintsprocessor.messageproducer.sample.clientId=default-client-id",
+
+    "blueprintsprocessor.messageconsumer.stream-consumer.type=kafka-streams-basic-auth",
+    "blueprintsprocessor.messageconsumer.stream-consumer.bootstrapServers=127.0.0.1:9092",
+    "blueprintsprocessor.messageconsumer.stream-consumer.applicationId=test-streams-application",
+    "blueprintsprocessor.messageconsumer.stream-consumer.topic=default-stream-topic"
+
+])
+class KafkaStreamsBasicAuthConsumerServiceTest {
+    @Autowired
+    lateinit var bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService
+
+    @Test
+    fun testProperties() {
+        val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+        assertNotNull(blueprintMessageConsumerService, "failed to get blueprintMessageProducerService")
+    }
+
+    /** Integration Kafka Testing, Enable and use this test case only for local desktop testing with real kafka broker */
+    //@Test
+    fun testKafkaStreamingMessageConsumer() {
+        runBlocking {
+            val streamingConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
+
+            // Dynamic Consumer Function to create Topology
+            val consumerFunction = object : KafkaStreamConsumerFunction {
+                override suspend fun createTopology(messageConsumerProperties: MessageConsumerProperties,
+                                                    additionalConfig: Map<String, Any>?): Topology {
+                    val topology = Topology()
+                    val kafkaStreamsBasicAuthConsumerProperties = messageConsumerProperties
+                            as KafkaStreamsBasicAuthConsumerProperties
+
+                    val topics = kafkaStreamsBasicAuthConsumerProperties.topic.split(",")
+                    topology.addSource("Source", *topics.toTypedArray())
+                    // Processor Supplier
+                    val firstProcessorSupplier = object : ProcessorSupplier<ByteArray, ByteArray> {
+                        override fun get(): Processor<ByteArray, ByteArray> {
+                            return FirstProcessor()
+                        }
+                    }
+                    val changelogConfig: MutableMap<String, String> = hashMapOf()
+                    changelogConfig.put("min.insync.replicas", "1")
+
+                    // Store Buolder
+                    val countStoreSupplier = Stores.keyValueStoreBuilder(
+                            Stores.persistentKeyValueStore("PriorityMessageState"),
+                            Serdes.String(),
+                            PriorityMessageSerde())
+                            .withLoggingEnabled(changelogConfig)
+
+                    topology.addProcessor("FirstProcessor", firstProcessorSupplier, "Source")
+                    topology.addStateStore(countStoreSupplier, "FirstProcessor")
+                    topology.addSink("SINK", "default-stream-topic-out", Serdes.String().serializer(),
+                            PriorityMessageSerde().serializer(), "FirstProcessor")
+                    return topology
+                }
+            }
+
+            /** Send message with every 1 sec */
+            val blueprintMessageProducerService = bluePrintMessageLibPropertyService
+                    .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
+            launch {
+                repeat(5) {
+                    delay(1000)
+                    val headers: MutableMap<String, String> = hashMapOf()
+                    headers["id"] = it.toString()
+                    blueprintMessageProducerService.sendMessageNB(message = "this is my message($it)",
+                            headers = headers)
+                }
+            }
+            streamingConsumerService.consume(null, consumerFunction)
+            delay(10000)
+            streamingConsumerService.shutDown()
+        }
+    }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/MockKafkaTopologyComponents.kt
new file mode 100644 (file)
index 0000000..4db9c77
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+
+import org.apache.kafka.common.serialization.Deserializer
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.common.serialization.Serializer
+import org.apache.kafka.streams.processor.Processor
+import org.apache.kafka.streams.processor.ProcessorContext
+import org.apache.kafka.streams.state.KeyValueStore
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
+import java.io.Serializable
+import java.nio.charset.Charset
+import java.util.*
+
+class PriorityMessage : Serializable {
+    lateinit var id: String
+    lateinit var requestMessage: String
+}
+
+open class PriorityMessageSerde : Serde<PriorityMessage> {
+
+    override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+    }
+
+    override fun close() {
+    }
+
+    override fun deserializer(): Deserializer<PriorityMessage> {
+        return object : Deserializer<PriorityMessage> {
+            override fun deserialize(topic: String, data: ByteArray): PriorityMessage {
+                return JacksonUtils.readValue(String(data), PriorityMessage::class.java)
+                        ?: throw BluePrintProcessorException("failed to convert")
+            }
+
+            override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+            }
+
+            override fun close() {
+            }
+        }
+    }
+
+    override fun serializer(): Serializer<PriorityMessage> {
+        return object : Serializer<PriorityMessage> {
+            override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+            }
+
+            override fun serialize(topic: String?, data: PriorityMessage): ByteArray {
+                return data.asJsonString().toByteArray(Charset.defaultCharset())
+            }
+
+            override fun close() {
+            }
+        }
+    }
+}
+
+
+class FirstProcessor : Processor<ByteArray, ByteArray> {
+
+    private val log = logger(FirstProcessor::class)
+
+    private lateinit var context: ProcessorContext
+    private lateinit var kvStore: KeyValueStore<String, PriorityMessage>
+
+    override fun process(key: ByteArray, value: ByteArray) {
+        log.info("First Processor key(${String(key)} : value(${String(value)})")
+        val newMessage = PriorityMessage().apply {
+            id = UUID.randomUUID().toString()
+            requestMessage = String(value)
+        }
+        kvStore.put(newMessage.id, newMessage)
+        this.context.forward(newMessage.id, newMessage)
+    }
+
+    override fun init(context: ProcessorContext) {
+        log.info("init... ${context.keySerde()}, ${context.valueSerde()}")
+        this.context = context
+        this.kvStore = context.getStateStore("PriorityMessageState") as KeyValueStore<String, PriorityMessage>
+    }
+
+    override fun close() {
+        log.info("Close...")
+    }
+}
\ No newline at end of file