Basic Ratpack API in DCAE APP Simulator 71/58571/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 12 Jun 2018 10:16:19 +0000 (12:16 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 05:36:34 +0000 (07:36 +0200)
Closes ONAP-266

Change-Id: Iaa000e976fcdc4274aa88ce7d0a6cd5866987680
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

hv-collector-dcae-app-simulator/pom.xml
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt [new file with mode: 0644]
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt [new file with mode: 0644]
hv-collector-dcae-app-simulator/src/main/resources/logback.xml [new file with mode: 0644]
pom.xml

index a7123e3..5796f1d 100644 (file)
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.ratpack</groupId>
+            <artifactId>ratpack-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor.kafka</groupId>
+            <artifactId>reactor-kafka</artifactId>
+        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
new file mode 100644 (file)
index 0000000..b0725d1
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.kafka.receiver.KafkaReceiver
+import reactor.kafka.receiver.ReceiverOptions
+import reactor.kafka.receiver.ReceiverRecord
+import java.util.*
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
+
+    private val consumedMessages = AtomicLong(0)
+
+    fun start(): Mono<Void> = Mono.create { sink ->
+        receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error)
+        receiver.receive().subscribe(this::update)
+    }
+
+    fun consumedMessages() = consumedMessages.get()
+
+    private fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+        consumedMessages.incrementAndGet()
+    }
+
+    companion object {
+        private val logger = Logger(KafkaSource::class)
+
+        fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
+            val props = HashMap<String, Any>()
+            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
+            props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator"
+            props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators"
+            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
+            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
+            val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
+                    .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } }
+                    .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } }
+                    .subscription(topics)
+            return KafkaSource(KafkaReceiver.create(receiverOptions))
+        }
+    }
+}
\ No newline at end of file
index a03dbcd..75c28c4 100644 (file)
@@ -1,9 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp
 
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.slf4j.LoggerFactory
 
-private val logger = LoggerFactory.getLogger("Dcae simulator :: main")
+private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
 
-fun main(args : Array<String>){
-    logger.info("Hello world!")
-}
\ No newline at end of file
+fun main(args: Array<String>) {
+    logger.info("Starting DCAE APP simulator")
+    val port = 8080
+    val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
+    val apiServer = ApiServer(messageSource)
+
+    messageSource.start()
+            .then(apiServer.start(port))
+            .block()
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
new file mode 100644 (file)
index 0000000..3f4e4fc
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
+
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import ratpack.handling.Chain
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class ApiServer(private val messageSource: KafkaSource) {
+    fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
+        RatpackServer.of { server ->
+            server.serverConfig(ServerConfig.embedded().port(port))
+                    .handlers(this::setupHandlers)
+        }
+    }.doOnNext(RatpackServer::start)
+
+    private fun setupHandlers(chain: Chain) {
+        chain.get("messages/count") { ctx ->
+            ctx.response.send(messageSource.consumedMessages().toString())
+        }
+    }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
new file mode 100644 (file)
index 0000000..48da3b1
--- /dev/null
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <property name="LOG_FILE"
+    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>
+        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+    <appender name="ROLLING-FILE"
+      class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <encoder>
+        <pattern>${FILE_LOG_PATTERN}</pattern>
+      </encoder>
+      <file>${LOG_FILE}</file>
+      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+        <maxFileSize>50MB</maxFileSize>
+        <maxHistory>30</maxHistory>
+        <totalSizeCap>10GB</totalSizeCap>
+      </rollingPolicy>
+    </appender>
+
+  <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+  <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+
+  <root level="INFO">
+      <appender-ref ref="CONSOLE"/>
+      <appender-ref ref="ROLLING-FILE"/>
+    </root>
+</configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6ac05ec..d2c5886 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                 <artifactId>javax.json</artifactId>
                 <version>1.1.2</version>
             </dependency>
+            <dependency>
+                <groupId>io.ratpack</groupId>
+                <artifactId>ratpack-core</artifactId>
+                <version>1.5.4</version>
+            </dependency>
 
             <!-- Test dependencies -->