Reorganize kafka module 24/90524/11
authorkjaniak <kornel.janiak@nokia.com>
Wed, 26 Jun 2019 13:57:29 +0000 (15:57 +0200)
committerkjaniak <kornel.janiak@nokia.com>
Tue, 2 Jul 2019 12:53:54 +0000 (14:53 +0200)
Change-Id: I2eb7a8a6e92c9d89586b877f4cae438497b62ae2
Issue-ID: DCAEGEN2-1635
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
19 files changed:
pom.xml
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt [moved from sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSource.kt with 57% similarity]
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt [new file with mode: 0644]
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt [new file with mode: 0644]
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/kafka.kt [moved from sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaConsumer.kt with 77% similarity]
sources/hv-collector-kafka-consumer/pom.xml
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt [new file with mode: 0644]
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/Metrics.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumer.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt [new file with mode: 0644]
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/state/OffsetConsumerTest.kt
sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt [deleted file]
sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt [new file with mode: 0644]
sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt [deleted file]
sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactoryTest.kt [moved from sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/impl/KafkaSourceTest.kt with 62% similarity]

diff --git a/pom.xml b/pom.xml
index 30545e8..2098e6f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
     </modules>
 
     <properties>
+        <coroutines.version>1.3.0-M2</coroutines.version>
         <kotlin.version>1.3.31</kotlin.version>
         <arrow.version>0.9.0</arrow.version>
         <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
             <dependency>
                 <groupId>org.jetbrains.kotlinx</groupId>
                 <artifactId>kotlinx-coroutines-core</artifactId>
-                <version>1.1.1</version>
+                <version>${coroutines.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.google.code.gson</groupId>
                 <version>3.1.7.RELEASE</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.jetbrains.kotlinx</groupId>
+                <artifactId>kotlinx-coroutines-test</artifactId>
+                <version>${coroutines.version}</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 </project>
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.kafka.impl
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
-import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.kafka.receiver.KafkaReceiver
@@ -44,31 +39,12 @@ internal class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteAr
     companion object {
         private val logger = Logger(KafkaSource::class)
 
-        private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
-        private const val USERNAME = "admin"
-        private const val PASSWORD = "admin_secret"
-        private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
-        private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
-
         fun create(bootstrapServers: String, topics: Set<String>) =
                 KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
 
         fun createReceiverOptions(bootstrapServers: String,
                                   topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
-            val props = mapOf<String, Any>(
-                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
-                    ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer",
-                    ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers",
-                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
-                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
-                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
-                    ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
-
-
-                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
-                    SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
-                    SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
-            )
+            val props = KafkaPropertiesFactory.create(bootstrapServers)
             return ReceiverOptions.create<ByteArray, ByteArray>(props)
                     .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
                     .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
index 8a7aafb..6ee640a 100644 (file)
@@ -20,8 +20,7 @@
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.ConsumerFactory
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.util.concurrent.ConcurrentLinkedQueue
 
@@ -41,9 +40,10 @@ internal class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArr
 
 internal interface ConsumerStateProvider {
     fun currentState(): ConsumerState
+    fun reset()
 }
 
-internal class Consumer : KafkaConsumer, ConsumerStateProvider {
+internal class Consumer : ConsumerStateProvider {
 
     private var consumedMessages: ConcurrentLinkedQueue<ByteArray> = ConcurrentLinkedQueue()
 
@@ -51,7 +51,7 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
 
     override fun reset() = consumedMessages.clear()
 
-    override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
+    fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
@@ -63,9 +63,19 @@ internal class Consumer : KafkaConsumer, ConsumerStateProvider {
 
 internal class DcaeAppConsumerFactory(private val kafkaBootstrapServers: String) {
 
-    private val consumerProvider = { Consumer() }
-
     fun createConsumersFor(topics: Set<String>) =
-            ConsumerFactory.createConsumersForTopics(kafkaBootstrapServers, topics, consumerProvider)
-                    .mapValues { it.value as Consumer }
+            KafkaSource.create(kafkaBootstrapServers, topics).let { kafkaSource ->
+                val topicToConsumer = topics.associateWith { Consumer() }
+                kafkaSource.start()
+                        .map {
+                            val topic = it.topic()
+                            topicToConsumer.get(topic)?.update(it)
+                                    ?: logger.warn { "No consumer configured for topic $topic" }
+                        }.subscribe()
+                topicToConsumer
+            }
+
+    companion object {
+        private val logger = Logger(DcaeAppConsumerFactory::class)
+    }
 }
index a594215..728eb2f 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
-import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import reactor.kafka.receiver.ReceiverRecord
 
 
 /**
@@ -77,6 +75,3 @@ private fun assertState(cut: Consumer, vararg values: ByteArray) {
     assertThat(cut.currentState().messagesCount)
             .isEqualTo(values.size)
 }
-
-private fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
-        ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppConsumerFactoryTest.kt
new file mode 100644 (file)
index 0000000..71f31ab
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+
+object DcaeAppConsumerFactoryTest : Spek({
+    describe("DcaeAppConsumerFactory") {
+        val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41"
+        val dcaeAppConsumerFactory = DcaeAppConsumerFactory(kafkaBootstrapServers)
+
+        on("creation of consumer") {
+            val kafkaTopics = setOf("topic1", "topic2")
+            val consumer = dcaeAppConsumerFactory.createConsumersFor(kafkaTopics)
+
+            it("should create consumer") {
+                assertThat(consumer).isNotEmpty.hasSize(2)
+                assertThat(consumer).containsOnlyKeys("topic1", "topic2")
+            }
+        }
+
+        on("empty kafkaTopics set") {
+            val emptyKafkaTopics = emptySet<String>()
+            val consumer = dcaeAppConsumerFactory.createConsumersFor(emptyKafkaTopics)
+
+            it("should not create consumer") {
+                assertThat(consumer).isEmpty()
+            }
+        }
+
+
+    }
+})
\ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
new file mode 100644 (file)
index 0000000..5bfbc91
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import com.nhaarman.mockitokotlin2.doNothing
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import reactor.core.publisher.Flux
+import reactor.kafka.receiver.KafkaReceiver
+import reactor.kafka.receiver.ReceiverOffset
+import reactor.kafka.receiver.ReceiverRecord
+import reactor.test.StepVerifier
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class KafkaSourceTest : Spek({
+
+    describe("KafkaSource"){
+        given("mocked Kafka Receiver"){
+            val mockedKafkaReceiver = mock<KafkaReceiver<ByteArray, ByteArray>>()
+            val mockedReceiverRecord = mock<ReceiverRecord<ByteArray, ByteArray>>()
+            whenever(mockedKafkaReceiver.receive()).thenReturn(Flux.just(mockedReceiverRecord))
+            on("function that starts KafkaSource") {
+                val mockedReceiverOffset = mock<ReceiverOffset>()
+                whenever(mockedReceiverRecord.receiverOffset()).thenReturn(mockedReceiverOffset)
+                doNothing().`when`(mockedReceiverOffset).acknowledge()
+
+                val testedFunction = { KafkaSource(mockedKafkaReceiver).start() }
+                it("should emmit receiver record") {
+                    StepVerifier.create(testedFunction())
+                            .expectSubscription()
+                            .expectNext(mockedReceiverRecord)
+                            .expectComplete()
+                            .verify()
+                }
+            }
+        }
+    }
+
+    given("parameters for factory methods") {
+        val servers = "kafka1:9080,kafka2:9080"
+        val topics = setOf("topic1", "topic2")
+
+        on("createReceiverOptions call with topics set") {
+            val options = KafkaSource.createReceiverOptions(servers, topics)
+            it("should generate options with provided topics") {
+                assertThat(options!!.subscriptionTopics()).contains("topic1", "topic2")
+            }
+        }
+
+        on("create call"){
+            val kafkaSource = KafkaSource.create(servers, topics)
+            it("should generate KafkaSource object") {
+                assertThat(kafkaSource).isInstanceOf(KafkaSource::class.java)
+            }
+        }
+    }
+
+})
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.kafka.api
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import org.apache.kafka.clients.consumer.ConsumerRecord
+import reactor.kafka.receiver.ReceiverRecord
 
-interface KafkaConsumer {
-    fun reset()
-    fun update(record: ConsumerRecord<ByteArray, ByteArray>)
-}
+internal fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+        ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
index f09a3a2..7ffb5eb 100644 (file)
       <version>${project.parent.version}</version>
       <scope>test</scope>
     </dependency>
-
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <scope>runtime</scope>
+    </dependency>
     <dependency>
       <groupId>io.micrometer</groupId>
       <artifactId>micrometer-registry-prometheus</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.projectreactor.netty</groupId>
+      <artifactId>reactor-netty</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.jetbrains.kotlin</groupId>
       <artifactId>kotlin-stdlib-jdk8</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
+      <groupId>org.jetbrains.kotlinx</groupId>
+      <artifactId>kotlinx-coroutines-core</artifactId>
+      <scope>compile</scope>
     </dependency>
     <dependency>
-      <groupId>ch.qos.logback</groupId>
-      <artifactId>logback-classic</artifactId>
-      <scope>runtime</scope>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
     </dependency>
     <dependency>
-      <groupId>io.projectreactor.netty</groupId>
-      <artifactId>reactor-netty</artifactId>
+      <groupId>org.jetbrains.kotlinx</groupId>
+      <artifactId>kotlinx-coroutines-test</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 </project>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSource.kt
new file mode 100644 (file)
index 0000000..dd24345
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.isActive
+import kotlinx.coroutines.launch
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+internal class KafkaSource(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
+                           private val topics: Set<String>,
+                           private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+    suspend fun start(offsetConsumer: OffsetConsumer, updateInterval: Long = 500L): Job =
+            GlobalScope.launch(dispatcher) {
+                kafkaConsumer.subscribe(topics)
+                val topicPartitions = kafkaConsumer.assignment()
+                while (isActive) {
+                    kafkaConsumer.endOffsets(topicPartitions)
+                            .forEach { (topicPartition, offset) ->
+                                offsetConsumer.update(topicPartition, offset)
+                            }
+                    kafkaConsumer.commitSync()
+                    delay(updateInterval)
+                }
+            }
+}
index 2fabf30..e576a88 100644 (file)
@@ -19,7 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer.metrics
 
-internal interface Metrics {
-    fun notifyOffsetChanged(offset: Long)
+interface Metrics {
+    fun notifyOffsetChanged(offset: Long, topic: String, partition: Int = 0)
     fun notifyMessageTravelTime(messageSentTimeMicros: Long)
-}
\ No newline at end of file
+}
index 748e43f..da1225e 100644 (file)
@@ -41,7 +41,8 @@ internal class MicrometerMetrics constructor(
         registry.scrape()
     }
 
-    override fun notifyOffsetChanged(offset: Long) {
+    override fun notifyOffsetChanged(offset: Long, topic: String, partition: Int) {
+        // TODO use topic and partition
         currentOffset.lazySet(offset)
     }
 
index 2c6707f..1481a22 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer.state
 
-import org.apache.kafka.clients.consumer.ConsumerRecord
-import org.onap.dcae.collectors.veshv.kafka.api.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 
-internal class OffsetConsumer(private val metrics: Metrics): KafkaConsumer  {
+internal class OffsetConsumer(private val metrics: Metrics) {
 
-    override fun update(record: ConsumerRecord<ByteArray, ByteArray>) {
-        val offset = record.offset()
-        logger.trace { "Current consumer offset $offset" }
-        metrics.notifyOffsetChanged(offset)
+    fun update(topicPartition: TopicPartition, offset: Long) {
+        logger.trace {
+            "Current consumer offset $offset for topic ${topicPartition.topic()} " +
+                    "on partition ${topicPartition.partition()}"
+        }
+        metrics.notifyOffsetChanged(offset, topicPartition.topic(), topicPartition.partition())
     }
 
-    override fun reset() = Unit
+    fun reset() = Unit
 
     companion object {
-        private val logger = Logger(OffsetConsumer::class)
+        val logger = Logger(OffsetConsumer::class)
     }
 }
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/KafkaSourceTest.kt
new file mode 100644 (file)
index 0000000..b0eb7a5
--- /dev/null
@@ -0,0 +1,154 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.impl
+
+import com.nhaarman.mockitokotlin2.argumentCaptor
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.times
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyZeroInteractions
+import com.nhaarman.mockitokotlin2.whenever
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.test.TestCoroutineDispatcher
+import kotlinx.coroutines.test.runBlockingTest
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.TopicPartition
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.kafkaconsumer.state.OffsetConsumer
+
+@ExperimentalCoroutinesApi
+object KafkaSourceTest : Spek({
+    given("KafkaSource") {
+        val testDispatcher = TestCoroutineDispatcher()
+        val mockedKafkaConsumer = mock<KafkaConsumer<ByteArray, ByteArray>>()
+        afterEachTest {
+            testDispatcher.cleanupTestCoroutines()
+        }
+        given("single topicName and partition") {
+            val topics = setOf("topicName")
+            val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+            val mockedOffsetConsumer = mock<OffsetConsumer>()
+            on("started KafkaSource") {
+                val topicPartition = createTopicPartition("topicName")
+                val topicPartitions = setOf(topicPartition)
+                whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+                whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+                        .thenReturn(mapOf<TopicPartition, Long>(topicPartition to newOffset))
+
+                runBlockingTest {
+                    val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+                    job.cancelAndJoin()
+                }
+
+                it("should call update function on topicName") {
+                    verify(mockedOffsetConsumer).update(topicPartition, newOffset)
+                }
+            }
+        }
+
+        given("two topics with partition") {
+            val topics = setOf(topicName1, topicName2)
+            val kafkaSource = KafkaSource(mockedKafkaConsumer, topics, testDispatcher)
+            val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+            on("started KafkaSource for two iteration of while loop") {
+                val topicPartitionArgumentCaptor = argumentCaptor<TopicPartition>()
+                val offsetArgumentCaptor = argumentCaptor<Long>()
+                val topicPartitionArgumentCaptorAfterInterval = argumentCaptor<TopicPartition>()
+                val offsetArgumentCaptorAfterInterval = argumentCaptor<Long>()
+                val topicPartition1 = createTopicPartition(topicName1)
+                val topicPartition2 = createTopicPartition(topicName2)
+                val topicPartitions = setOf(topicPartition1, topicPartition2)
+                whenever(mockedKafkaConsumer.assignment()).thenReturn(topicPartitions)
+                val partitionToOffset1 =
+                        mapOf(topicPartition1 to newOffset,
+                                topicPartition2 to anotherNewOffset)
+                val partitionToOffset2 =
+                        mapOf(topicPartition1 to anotherNewOffset,
+                                topicPartition2 to newOffset)
+                whenever(mockedKafkaConsumer.endOffsets(topicPartitions))
+                        .thenReturn(partitionToOffset1, partitionToOffset2)
+
+                runBlockingTest {
+                    val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+                    verify(mockedOffsetConsumer, times(topicsAmount)).update(topicPartitionArgumentCaptor.capture(),
+                            offsetArgumentCaptor.capture())
+
+                    testDispatcher.advanceTimeBy(updateIntervalInMs)
+
+                    verify(mockedOffsetConsumer, times(topicsAmountAfterInterval))
+                            .update(topicPartitionArgumentCaptorAfterInterval.capture(), offsetArgumentCaptorAfterInterval.capture())
+
+                    it("should calls update function with proper arguments - before interval") {
+                        assertThat(topicPartitionArgumentCaptor.firstValue).isEqualTo(topicPartition1)
+                        assertThat(offsetArgumentCaptor.firstValue).isEqualTo(newOffset)
+                        assertThat(topicPartitionArgumentCaptor.secondValue).isEqualTo(topicPartition2)
+                        assertThat(offsetArgumentCaptor.secondValue).isEqualTo(anotherNewOffset)
+                    }
+                    it("should calls update function with proper arguments - after interval") {
+                        assertThat(topicPartitionArgumentCaptorAfterInterval.thirdValue).isEqualTo(topicPartition1)
+                        assertThat(offsetArgumentCaptorAfterInterval.thirdValue).isEqualTo(anotherNewOffset)
+                        assertThat(topicPartitionArgumentCaptorAfterInterval.lastValue).isEqualTo(topicPartition2)
+                        assertThat(offsetArgumentCaptorAfterInterval.lastValue).isEqualTo(newOffset)
+                    }
+                    job.cancelAndJoin()
+                }
+            }
+        }
+
+        given("empty topicName list") {
+            val emptyTopics = emptySet<String>()
+            val kafkaSource = KafkaSource(mockedKafkaConsumer, emptyTopics, testDispatcher)
+            val mockedOffsetConsumer = mock<OffsetConsumer>()
+
+            on("call of function start") {
+                val emptyTopicPartitions = setOf(null)
+                whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
+                whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
+                        .thenReturn(emptyMap())
+
+                runBlockingTest {
+                    val job = kafkaSource.start(mockedOffsetConsumer, updateIntervalInMs)
+                    job.cancelAndJoin()
+                }
+
+                it("should not interact with OffsetConsumer") {
+                    verifyZeroInteractions(mockedOffsetConsumer)
+                }
+            }
+        }
+
+    }
+})
+
+private const val updateIntervalInMs = 10L
+private const val partitionNumber = 0
+private const val newOffset = 2L
+private const val anotherNewOffset = 10L
+private const val topicName1 = "topicName1"
+private const val topicName2 = "topicName2"
+private const val topicsAmount = 2
+private const val topicsAmountAfterInterval = 4
+fun createTopicPartition(topic: String) = TopicPartition(topic, partitionNumber)
\ No newline at end of file
index 4158786..96ba588 100644 (file)
@@ -74,7 +74,7 @@ object MicrometerMetricsTest : Spek({
             val offset = 966L
 
             it("should update $gaugeName") {
-                cut.notifyOffsetChanged(offset)
+                cut.notifyOffsetChanged(offset, "sample_topic", 1)
 
                 registry.verifyGauge(gaugeName) {
                     assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
index 6fb42d8..242f27b 100644 (file)
@@ -21,10 +21,10 @@ package org.onap.dcae.collectors.veshv.kafkaconsumer.state
 
 import com.nhaarman.mockitokotlin2.mock
 import com.nhaarman.mockitokotlin2.verify
-import com.nhaarman.mockitokotlin2.whenever
-import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
@@ -33,16 +33,20 @@ object OffsetConsumerTest : Spek({
     describe("OffsetConsumer with metrics") {
         val mockedMetrics = mock<Metrics>()
         val offsetConsumer = OffsetConsumer(mockedMetrics)
+        given("topicName with partition"){
+            val topicPartition = TopicPartition(topicName, partitionNumber)
 
-        on("new update method call") {
-            val consumerRecord = mock<ConsumerRecord<ByteArray, ByteArray>>()
-            whenever(consumerRecord.offset()).thenReturn(1)
+            on("new update method call") {
+                offsetConsumer.update(topicPartition, newOffset)
 
-            offsetConsumer.update(consumerRecord)
-
-            it("should notify message offset metric") {
-                verify(mockedMetrics).notifyOffsetChanged(1)
+                it("should notify message newOffset metric") {
+                    verify(mockedMetrics).notifyOffsetChanged(newOffset, topicName, partitionNumber)
+                }
             }
         }
     }
 })
+
+private const val partitionNumber = 1
+private const val newOffset: Long = 99
+private const val topicName = "sample-topicName"
diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactory.kt
deleted file mode 100644 (file)
index 88eb8ce..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.kafka.api
-
-import org.onap.dcae.collectors.veshv.kafka.impl.KafkaSource
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
-typealias ConsumerProvider = () -> KafkaConsumer
-
-object ConsumerFactory {
-    private val logger = Logger(ConsumerFactory::class)
-
-    fun createConsumersForTopics(kafkaBootstrapServers: String,
-                                 kafkaTopics: Set<String>,
-                                 consumerProvider: ConsumerProvider): Map<String, KafkaConsumer> =
-            KafkaSource.create(kafkaBootstrapServers, kafkaTopics).let { kafkaSource ->
-                val topicToConsumer = kafkaTopics.associate { it to consumerProvider() }
-                kafkaSource.start()
-                        .map {
-                            val topic = it.topic()
-                            topicToConsumer.get(topic)?.update(it)
-                                    ?: logger.warn { "No consumer configured for topic $topic" }
-                        }.subscribe()
-                topicToConsumer
-            }
-}
diff --git a/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt b/sources/hv-collector-kafka/src/main/kotlin/org/onap/dcae/collectors/veshv/kafka/api/KafkaPropertiesFactory.kt
new file mode 100644 (file)
index 0000000..b654274
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafka.api
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+
+object KafkaPropertiesFactory {
+    private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+    private const val USERNAME = "admin"
+    private const val PASSWORD = "admin_secret"
+    private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+    private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
+    fun create(bootstrapServers: String): Map<String, Any> {
+        val props = mapOf<String, Any>(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+                ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-consumer",
+                ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-consumers",
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+                ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "3000",
+
+
+                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
+                SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
+                SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
+        )
+        return props
+    }
+}
\ No newline at end of file
diff --git a/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt b/sources/hv-collector-kafka/src/test/kotlin/org/onap/dcae/collectors/veshv/kafka/api/ConsumerFactoryTest.kt
deleted file mode 100644 (file)
index a8ba421..0000000
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.kafka.api
-
-import com.nhaarman.mockitokotlin2.mock
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.entry
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-
-object ConsumerFactoryTest : Spek({
-    describe("ConsumerFactory") {
-        val kafkaBootstrapServers = "0.0.0.0:40,0.0.0.1:41"
-        given("consumer provider"){
-            val mockedKafkaConsumer = mock<KafkaConsumer>()
-            val consumerProvider = { mockedKafkaConsumer }
-            on("creation of consumer") {
-                val kafkaTopics = setOf("topic1", "topic2")
-                val consumer = ConsumerFactory.createConsumersForTopics(
-                        kafkaBootstrapServers,
-                        kafkaTopics,
-                        consumerProvider)
-                it("should create consumer"){
-                    assertThat(consumer).isNotEmpty.hasSize(2)
-                    assertThat(consumer).contains(entry("topic1", mockedKafkaConsumer),
-                            entry("topic2", mockedKafkaConsumer))
-                }
-            }
-            on("empty kafkaTopics set"){
-                val emptyKafkaTopics = emptySet<String>()
-                val consumer = ConsumerFactory.createConsumersForTopics(
-                        kafkaBootstrapServers,
-                        emptyKafkaTopics,
-                        consumerProvider)
-
-                it("should not create consumer"){
-                    assertThat(consumer).isEmpty()
-                }
-            }
-        }
-
-
-    }
-})
\ No newline at end of file
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.kafka.impl
+package org.onap.dcae.collectors.veshv.kafka.api
 
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since August 2018
- */
-internal class KafkaSourceTest : Spek({
+internal class KafkaPropertiesFactoryTest : Spek({
     val servers = "kafka1:9080,kafka2:9080"
-    val topics = setOf("topic1", "topic2")
 
-    describe("receiver options") {
-        val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
+    describe("KafkaPropertiesFactory") {
+        val options = KafkaPropertiesFactory.create(servers)
 
         fun verifyProperty(key: String, expectedValue: Any) {
             it("should have $key option set") {
-                assertThat(options.consumerProperty(key))
+                assertThat(options.getValue(key))
                         .isEqualTo(expectedValue)
             }
         }
@@ -50,5 +49,15 @@ internal class KafkaSourceTest : Spek({
         verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
         verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
         verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+        verifyProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "3000")
+        verifyProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+        verifyProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+        verifyProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG)
     }
-})
\ No newline at end of file
+})
+
+private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+private const val USERNAME = "admin"
+private const val PASSWORD = "admin_secret"
+internal val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+internal const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"