depends_on:
- veshvcollector
volumes:
- - ./ssl/:/etc/ves-hv/
\ No newline at end of file
+ - ./ssl/:/etc/ves-hv/
+ dcae-app-simulator:
+ build:
+ context: hv-collector-dcae-app-simulator
+ dockerfile: Dockerfile
+ ports:
+ - "8080:8080/tcp"
+ depends_on:
+ - kafka
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-internal-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
- <includeGroupIds>${project.parent.groupId}</includeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- <execution>
- <id>copy-external-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
- <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
</plugin>
- <!--
+ <!-- TODO: unskip docker
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
--- /dev/null
+FROM openjdk:10-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+EXPOSE 8080
+
+WORKDIR /opt/ves-hv-dcae-app-simulator
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
+CMD ["--kafka-bootstrap-servers", "TODO", "--kafka-topics", "TODO", "--api-port", "TODO"]
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-dcae-app-simulator-*.jar ./
</plugin>
</plugins>
</build>
-
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <!-- TODO: unskip docker
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ -->
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
<dependencies>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
-import reactor.kafka.receiver.ReceiverRecord
import java.util.*
-import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- private val consumedMessages = AtomicLong(0)
-
- fun start(): Mono<Void> = Mono.create { sink ->
- receiver.doOnConsumer { it.subscription() }.subscribe({ sink.success() }, sink::error)
- receiver.receive().subscribe(this::update)
- }
-
- fun consumedMessages() = consumedMessages.get()
-
- private fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
- consumedMessages.incrementAndGet()
+ fun start(): Mono<Consumer> = Mono.create { sink ->
+ val consumer = Consumer()
+ receiver.receive().subscribe(consumer::update)
+ sink.success(consumer)
}
companion object {
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
- .addAssignListener { partitions -> logger.debug { "onPartitionsAssigned $partitions" } }
- .addRevokeListener { partitions -> logger.debug { "onPartitionsRevoked $partitions" } }
+ .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
+ .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
.subscription(topics)
return KafkaSource(KafkaReceiver.create(receiverOptions))
}
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+
+import reactor.core.publisher.Mono
+import reactor.kafka.receiver.ReceiverRecord
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+
+interface ConsumerStateProvider {
+ fun currentState(): Mono<ConsumerState>
+}
+
+class Consumer : ConsumerStateProvider {
+ private var msgCount = 0L
+ private var lastKey: ByteArray? = null
+ private var lastValue: ByteArray? = null
+
+ override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
+ val state = synchronized(this) {
+ ConsumerState(msgCount, lastKey, lastValue)
+ }
+ sink.success(state)
+ }
+
+ fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ synchronized(this) {
+ msgCount++
+ lastKey = record.key()
+ lastValue = record.value()
+ }
+ }
+}
fun main(args: Array<String>) {
logger.info("Starting DCAE APP simulator")
val port = 8080
- val messageSource = KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
- val apiServer = ApiServer(messageSource)
- messageSource.start()
- .then(apiServer.start(port))
+ KafkaSource.create("kafka:9092", setOf("ves_hvRanMeas"))
+ .start()
+ .map(::ApiServer)
+ .flatMap { it.start(port) }
.block()
}
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
+import com.google.protobuf.util.JsonFormat
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class ApiServer(private val messageSource: KafkaSource) {
+class ApiServer(private val consumerState: ConsumerStateProvider) {
+ private val jsonPrinter = JsonFormat.printer()
+
fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
RatpackServer.of { server ->
server.serverConfig(ServerConfig.embedded().port(port))
}.doOnNext(RatpackServer::start)
private fun setupHandlers(chain: Chain) {
- chain.get("messages/count") { ctx ->
- ctx.response.send(messageSource.consumedMessages().toString())
- }
+ chain
+ .get("messages/count") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.currentState()
+ .map { it.msgCount.toString() }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/key") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastKey }
+ .map { VesEvent.CommonEventHeader.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+
+ .get("messages/last/value") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ consumerState.currentState()
+ .map { it.lastValue }
+ .map { VesEvent.parseFrom(it) }
+ .map { jsonPrinter.print(it) }
+ .subscribe(ctx.response::send)
+ }
+ }
+
+ companion object {
+ private const val CONTENT_TEXT = "text/plain"
+ private const val CONTENT_JSON = "application/json"
}
}
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>copy-internal-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
- <includeGroupIds>${project.parent.groupId}</includeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- <execution>
- <id>copy-external-deps</id>
- <phase>package</phase>
- <goals>
- <goal>copy-dependencies</goal>
- </goals>
- <configuration>
- <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
- <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
- <includeScope>runtime</includeScope>
- </configuration>
- </execution>
- </executions>
</plugin>
- <!--
+ <!-- TODO: unskip docker
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<skipAnalysis>true</skipAnalysis>
<!-- Docker -->
- <skipDocker>true</skipDocker>
+ <skipDocker>true</skipDocker> <!-- TODO: unskip docker -->
<docker-image.name>ves-hv-collector/${project.artifactId}</docker-image.name>
<docker-image.namespace>onap</docker-image.namespace>
</properties>
</dependency>
</dependencies>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>3.1.1</version>
- </plugin>
</plugins>
</pluginManagement>
<extensions>
</images>
</configuration>
</plugin>
-
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>3.1.1</version>
+ <executions>
+ <execution>
+ <id>copy-internal-deps</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/libs/internal</outputDirectory>
+ <includeGroupIds>${project.parent.groupId}</includeGroupIds>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ <execution>
+ <id>copy-external-deps</id>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/libs/external</outputDirectory>
+ <excludeGroupIds>${project.parent.groupId}</excludeGroupIds>
+ <includeScope>runtime</includeScope>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</pluginManagement>
</build>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>