Dockerize DCAE APP simulator 81/58581/1
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 12 Jun 2018 12:19:10 +0000 (14:19 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 2 Aug 2018 06:16:28 +0000 (08:16 +0200)
Closes ONAP-265
Closes ONAP-267

Change-Id: I394476cf7ba3851d663a2995dc7fe591dae5be41
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601

docker-compose.yml
hv-collector-client-simulator/pom.xml
hv-collector-dcae-app-simulator/Dockerfile [new file with mode: 0644]
hv-collector-dcae-app-simulator/pom.xml
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt [new file with mode: 0644]
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
hv-collector-main/pom.xml
pom.xml

index 0f0cca2..8db767c 100644 (file)
@@ -34,4 +34,12 @@ services:
     depends_on:
       - veshvcollector
     volumes:
-      - ./ssl/:/etc/ves-hv/
\ No newline at end of file
+      - ./ssl/:/etc/ves-hv/
+  dcae-app-simulator:
+    build:
+      context: hv-collector-dcae-app-simulator
+      dockerfile: Dockerfile
+    ports:
+      - "8080:8080/tcp"
+    depends_on:
+      - kafka
index 012bda5..86cdeca 100644 (file)
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-dependency-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>copy-internal-deps</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>copy-dependencies</goal>
-                                </goals>
-                                <configuration>
-                                    <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
-                                    <includeGroupIds>${project.parent.groupId}</includeGroupIds>
-                                    <includeScope>runtime</includeScope>
-                                </configuration>
-                            </execution>
-                            <execution>
-                                <id>copy-external-deps</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>copy-dependencies</goal>
-                                </goals>
-                                <configuration>
-                                    <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
-                                    <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
-                                    <includeScope>runtime</includeScope>
-                                </configuration>
-                            </execution>
-                        </executions>
                     </plugin>
-                    <!--
+                    <!-- TODO: unskip docker
                     <plugin>
                         <groupId>io.fabric8</groupId>
                         <artifactId>docker-maven-plugin</artifactId>
diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile
new file mode 100644 (file)
index 0000000..8e1d7e8
--- /dev/null
@@ -0,0 +1,15 @@
+FROM openjdk:10-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+EXPOSE 8080
+
+WORKDIR /opt/ves-hv-dcae-app-simulator
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
+CMD ["--kafka-bootstrap-servers", "TODO", "--kafka-topics", "TODO", "--api-port", "TODO"]
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-dcae-app-simulator-*.jar ./
index 5796f1d..046f5ed 100644 (file)
             </plugin>
         </plugins>
     </build>
-
+    <profiles>
+        <profile>
+            <id>docker</id>
+            <activation>
+                <property>
+                    <name>!skipDocker</name>
+                </property>
+            </activation>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-dependency-plugin</artifactId>
+                    </plugin>
+                    <!-- TODO: unskip docker
+                    <plugin>
+                        <groupId>io.fabric8</groupId>
+                        <artifactId>docker-maven-plugin</artifactId>
+                    </plugin>
+                    -->
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
     <dependencies>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <groupId>io.projectreactor.kafka</groupId>
             <artifactId>reactor-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java-util</artifactId>
+        </dependency>
         <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
index b0725d1..f7703b8 100644 (file)
@@ -25,9 +25,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Mono
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
-import reactor.kafka.receiver.ReceiverRecord
 import java.util.*
-import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,17 +33,10 @@ import java.util.concurrent.atomic.AtomicLong
  */
 class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
 
-    private val consumedMessages = AtomicLong(0)
-
-    fun start(): Mono<Void> = Mono.create { sink ->
-        receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error)
-        receiver.receive().subscribe(this::update)
-    }
-
-    fun consumedMessages() = consumedMessages.get()
-
-    private fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
-        consumedMessages.incrementAndGet()
+    fun start(): Mono<Consumer> = Mono.create { sink ->
+        val consumer = Consumer()
+        receiver.receive().subscribe(consumer::update)
+        sink.success(consumer)
     }
 
     companion object {
@@ -60,10 +51,10 @@ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
             props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
             props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
             val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
-                    .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } }
-                    .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } }
+                    .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+                    .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
                     .subscription(topics)
             return KafkaSource(KafkaReceiver.create(receiverOptions))
         }
     }
-}
\ No newline at end of file
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
new file mode 100644 (file)
index 0000000..5f0fe7f
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+
+import reactor.core.publisher.Mono
+import reactor.kafka.receiver.ReceiverRecord
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+
+interface ConsumerStateProvider {
+    fun currentState(): Mono<ConsumerState>
+}
+
+class Consumer : ConsumerStateProvider {
+    private var msgCount = 0L
+    private var lastKey: ByteArray? = null
+    private var lastValue: ByteArray? = null
+
+    override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
+        val state = synchronized(this) {
+            ConsumerState(msgCount, lastKey, lastValue)
+        }
+        sink.success(state)
+    }
+
+    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+        synchronized(this) {
+            msgCount++
+            lastKey = record.key()
+            lastValue = record.value()
+        }
+    }
+}
index 75c28c4..170806a 100644 (file)
@@ -29,10 +29,10 @@ private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
 fun main(args: Array<String>) {
     logger.info("Starting DCAE APP simulator")
     val port = 8080
-    val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
-    val apiServer = ApiServer(messageSource)
 
-    messageSource.start()
-            .then(apiServer.start(port))
+    KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
+            .start()
+            .map(::ApiServer)
+            .flatMap { it.start(port) }
             .block()
 }
index 3f4e4fc..fcb8e13 100644 (file)
@@ -19,7 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
 
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.VesEventV5.VesEvent
 import ratpack.handling.Chain
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
@@ -29,7 +31,9 @@ import reactor.core.publisher.Mono
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class ApiServer(private val messageSource: KafkaSource) {
+class ApiServer(private val consumerState: ConsumerStateProvider) {
+    private val jsonPrinter = JsonFormat.printer()
+
     fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
         RatpackServer.of { server ->
             server.serverConfig(ServerConfig.embedded().port(port))
@@ -38,8 +42,35 @@ class ApiServer(private val messageSource: KafkaSource) {
     }.doOnNext(RatpackServer::start)
 
     private fun setupHandlers(chain: Chain) {
-        chain.get("messages/count") { ctx ->
-            ctx.response.send(messageSource.consumedMessages().toString())
-        }
+        chain
+                .get("messages/count") { ctx ->
+                    ctx.response.contentType(CONTENT_TEXT)
+                    consumerState.currentState()
+                            .map { it.msgCount.toString() }
+                            .subscribe(ctx.response::send)
+                }
+
+                .get("messages/last/key") { ctx ->
+                    ctx.response.contentType(CONTENT_JSON)
+                    consumerState.currentState()
+                            .map { it.lastKey }
+                            .map { VesEvent.CommonEventHeader.parseFrom(it) }
+                            .map { jsonPrinter.print(it) }
+                            .subscribe(ctx.response::send)
+                }
+
+                .get("messages/last/value") { ctx ->
+                    ctx.response.contentType(CONTENT_JSON)
+                    consumerState.currentState()
+                            .map { it.lastValue }
+                            .map { VesEvent.parseFrom(it) }
+                            .map { jsonPrinter.print(it) }
+                            .subscribe(ctx.response::send)
+                }
+    }
+
+    companion object {
+        private const val CONTENT_TEXT = "text/plain"
+        private const val CONTENT_JSON = "application/json"
     }
 }
index f7f8eb7..a5a35ba 100644 (file)
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-dependency-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <id>copy-internal-deps</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>copy-dependencies</goal>
-                                </goals>
-                                <configuration>
-                                    <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
-                                    <includeGroupIds>${project.parent.groupId}</includeGroupIds>
-                                    <includeScope>runtime</includeScope>
-                                </configuration>
-                            </execution>
-                            <execution>
-                                <id>copy-external-deps</id>
-                                <phase>package</phase>
-                                <goals>
-                                    <goal>copy-dependencies</goal>
-                                </goals>
-                                <configuration>
-                                    <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
-                                    <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
-                                    <includeScope>runtime</includeScope>
-                                </configuration>
-                            </execution>
-                        </executions>
                     </plugin>
-                    <!--
+                    <!-- TODO: unskip docker
                     <plugin>
                         <groupId>io.fabric8</groupId>
                         <artifactId>docker-maven-plugin</artifactId>
diff --git a/pom.xml b/pom.xml
index af61407..9e33ec5 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <skipAnalysis>true</skipAnalysis>
 
         <!-- Docker -->
-        <skipDocker>true</skipDocker>
+        <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
         <docker-image.name>ves-hv-collector/${project.artifactId}</docker-image.name>
         <docker-image.namespace>onap</docker-image.namespace>
     </properties>
                         </dependency>
                     </dependencies>
                 </plugin>
-                <plugin>
-                    <groupId>org.apache.maven.plugins</groupId>
-                    <artifactId>maven-dependency-plugin</artifactId>
-                    <version>3.1.1</version>
-                </plugin>
             </plugins>
         </pluginManagement>
         <extensions>
                                 </images>
                             </configuration>
                         </plugin>
-
+                        <plugin>
+                            <groupId>org.apache.maven.plugins</groupId>
+                            <artifactId>maven-dependency-plugin</artifactId>
+                            <version>3.1.1</version>
+                            <executions>
+                                <execution>
+                                    <id>copy-internal-deps</id>
+                                    <phase>package</phase>
+                                    <goals>
+                                        <goal>copy-dependencies</goal>
+                                    </goals>
+                                    <configuration>
+                                        <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
+                                        <includeGroupIds>${project.parent.groupId}</includeGroupIds>
+                                        <includeScope>runtime</includeScope>
+                                    </configuration>
+                                </execution>
+                                <execution>
+                                    <id>copy-external-deps</id>
+                                    <phase>package</phase>
+                                    <goals>
+                                        <goal>copy-dependencies</goal>
+                                    </goals>
+                                    <configuration>
+                                        <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
+                                        <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
+                                        <includeScope>runtime</includeScope>
+                                    </configuration>
+                                </execution>
+                            </executions>
+                        </plugin>
                     </plugins>
                 </pluginManagement>
             </build>
                 <artifactId>protobuf-java</artifactId>
                 <version>${protobuf.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.google.protobuf</groupId>
+                <artifactId>protobuf-java-util</artifactId>
+                <version>${protobuf.version}</version>
+            </dependency>
             <dependency>
                 <groupId>commons-cli</groupId>
                 <artifactId>commons-cli</artifactId>