Remove Ratpack dependency for HV-VES health checks 27/60527/10
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Tue, 14 Aug 2018 10:52:28 +0000 (12:52 +0200)
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 16 Aug 2018 08:09:12 +0000 (10:09 +0200)
In order to minimize complexity and possibly improve performance (thread count)
reactor-netty should be used instead of Ratpack. Also reorganize code to
be more consistent and differentiated readiness and liveness endpoints
(for future use in K8s Pod definition).

As an example I've defined health check probe in docker-compose YAML.

Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186
Issue-ID: DCAEGEN2-705
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
25 files changed:
docker-compose.yml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt [moved from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt with 78% similarity]
hv-collector-health-check/pom.xml
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt [deleted file]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt [moved from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt with 73% similarity]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt [new file with mode: 0644]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt [new file with mode: 0644]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt [moved from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt with 74% similarity]
hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
hv-collector-main/pom.xml
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt [new file with mode: 0644]
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt [new file with mode: 0644]
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt [new file with mode: 0644]
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt [new file with mode: 0644]
pom.xml

index 7bd84f5..33aedec 100644 (file)
@@ -1,6 +1,5 @@
-version: "2"
+version: "3.4"
 services:
-
   zookeeper:
     image: wurstmeister/zookeeper
     ports:
@@ -29,14 +28,22 @@ services:
       command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
 
   ves-hv-collector:
-    image: onap/ves-hv-collector
+    image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
 #    build:
 #      context: hv-collector-main
 #      dockerfile: Dockerfile
     ports:
       - "6060:6060"
       - "6061:6061/tcp"
-    command: ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"]
+    command: ["--listen-port", "6061",
+              "--health-check-api-port", "6060",
+              "--config-url", "http://consul:8500/v1/kv/veshv-config"]
+    healthcheck:
+      test: curl -f http://localhost:6060/health/ready || exit 1
+      interval: 10s
+      timeout: 3s
+      retries: 3
+      start_period: 20s
     depends_on:
       - kafka
       - consul
@@ -44,7 +51,7 @@ services:
       - ./ssl/:/etc/ves-hv/
 
   xnf-simulator:
-    image: onap/ves-hv-collector-xnf-simulator
+    image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
 #    build:
 #      context: hv-collector-xnf-simulator
 #      dockerfile: Dockerfile
@@ -57,7 +64,7 @@ services:
       - ./ssl/:/etc/ves-hv/
 
   dcae-app-simulator:
-    image: onap/ves-hv-collector-dcae-simulator
+    image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
 #    build:
 #      context: hv-collector-dcae-app-simulator
 #      dockerfile: Dockerfile
index ff99717..6c256b7 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector
 interface Server {
     fun start(): IO<ServerHandle>
 }
-
-abstract class ServerHandle(val host: String, val port: Int) {
-    abstract fun shutdown(): IO<Unit>
-    abstract fun await(): IO<Unit>
-}
index 3e652b9..a400ff3 100644 (file)
@@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference
 class CollectorFactory(val configuration: ConfigurationProvider,
                        private val sinkProvider: SinkProvider,
                        private val metrics: Metrics,
-                       private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
+                       private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
         val collector: AtomicReference<Collector> = AtomicReference()
@@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                 .map(this::createVesHvCollector)
                 .doOnNext {
                     logger.info("Using updated configuration for new connections")
-                    healthStateProvider.changeState(HealthState.HEALTHY)
+                    healthState.changeState(HealthDescription.HEALTHY)
                 }
                 .doOnError {
                     logger.error("Failed to acquire configuration from consul")
-                    healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+                    healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(collector::set)
         return collector::get
index 8146303..7de2830 100644 (file)
@@ -20,8 +20,8 @@
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
                                            private val url: String,
                                            private val firstRequestDelay: Duration,
                                            private val requestInterval: Duration,
-                                           private val healthStateProvider: HealthStateProvider,
+                                           private val healthState: HealthState,
                                            retrySpec: Retry<Any>
 
 ) : ConfigurationProvider {
@@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
     private val retry = retrySpec
             .doOnRetry {
                 logger.warn("Could not get fresh configuration", it.exception())
-                healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
             }
 
     constructor(http: HttpAdapter,
@@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
             params.configurationUrl,
             params.firstRequestDelay,
             params.requestInterval,
-            HealthStateProvider.INSTANCE,
+            HealthState.INSTANCE,
             Retry.any<Any>()
                     .retryMax(MAX_RETRIES)
                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
index c28b151..f858d95 100644 (file)
@@ -24,15 +24,15 @@ import arrow.effects.IO
 import io.netty.handler.ssl.SslContext
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Mono
 import reactor.ipc.netty.NettyInbound
 import reactor.ipc.netty.NettyOutbound
 import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.BlockingNettyContext
 import reactor.ipc.netty.tcp.TcpServer
 import java.time.Duration
 import java.util.function.BiFunction
@@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
         return this
     }
 
-    private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
-        override fun shutdown() = IO {
-            ctx.shutdown()
-        }
-
-        override fun await() = IO<Unit> {
-            ctx.context.channel().closeFuture().sync()
-        }
-    }
-
     companion object {
         private val logger = Logger(NettyTcpServer::class)
     }
index f9a9ba6..7885892 100644 (file)
@@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
@@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({
     describe("Consul configuration provider") {
 
         val httpAdapterMock: HttpAdapter = mock()
-        val healthStateProvider = HealthStateProvider.INSTANCE
+        val healthStateProvider = HealthState.INSTANCE
 
         given("valid resource url") {
             val validUrl = "http://valid-url/"
@@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
                 it("should update the health state"){
                     StepVerifier.create(healthStateProvider().take(iterationCount))
                             .expectNextCount(iterationCount - 1)
-                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
                             .verifyComplete()
                 }
             }
@@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({
 
 private fun constructConsulConfigProvider(url: String,
                                           httpAdapter: HttpAdapter,
-                                          healthStateProvider: HealthStateProvider,
+                                          healthState: HealthState,
                                           iterationCount: Long = 1
 ): ConsulConfigurationProvider {
 
@@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String,
             url,
             firstRequestDelay,
             requestInterval,
-            healthStateProvider,
+            healthState,
             retry
     )
 }
index e7b7770..e9b7057 100644 (file)
@@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import reactor.core.publisher.Flux
@@ -40,7 +40,7 @@ import java.time.Duration
  */
 class Sut(sink: Sink = StoringSink()) {
     val configurationProvider = FakeConfigurationProvider()
-    val healthStateProvider = FakeHealthStateProvider()
+    val healthStateProvider = FakeHealthState()
 
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     private val metrics = FakeMetrics()
index 493517b..a9f3e9a 100644 (file)
@@ -24,7 +24,7 @@ import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
@@ -351,7 +351,7 @@ object VesHvSpecification : Spek({
             it("should mark the application healthy") {
                 assertThat(sut.healthStateProvider.currentHealth)
                         .describedAs("application health state")
-                        .isEqualTo(HealthState.HEALTHY)
+                        .isEqualTo(HealthDescription.HEALTHY)
             }
         }
 
@@ -363,7 +363,7 @@ object VesHvSpecification : Spek({
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
                         .describedAs("application health state")
-                        .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+                        .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
             }
         }
     }
  */
 package org.onap.dcae.collectors.veshv.tests.fakes
 
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import reactor.core.publisher.Flux
 
-class FakeHealthStateProvider : HealthStateProvider {
+class FakeHealthState : HealthState {
 
-    lateinit var currentHealth: HealthState
+    lateinit var currentHealth: HealthDescription
 
-    override fun changeState(healthState: HealthState) {
-        currentHealth = healthState
+    override fun changeState(healthDescription: HealthDescription) {
+        currentHealth = healthDescription
     }
 
-    override fun invoke(): Flux<HealthState> {
+    override fun invoke(): Flux<HealthDescription> {
         throw NotImplementedError()
     }
 }
index 1e77adb..0951587 100644 (file)
@@ -50,8 +50,8 @@
             <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.ratpack</groupId>
-            <artifactId>ratpack-core</artifactId>
+            <groupId>io.projectreactor.ipc</groupId>
+            <artifactId>reactor-netty</artifactId>
         </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
deleted file mode 100644 (file)
index b21d187..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.healthcheck.api
-
-import arrow.effects.IO
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
- */
-class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) {
-
-    private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING)
-
-    fun start(port: Int): IO<RatpackServer> = IO {
-        healthStateProvider().subscribe(healthState::set)
-        RatpackServer
-                .start {
-                    it
-                            .serverConfig(ServerConfig.embedded().port(port).development(false))
-                            .handlers(this::configureHandlers)
-                }
-    }
-
-    private fun configureHandlers(chain: Chain) {
-        chain
-                .get("healthcheck") { ctx ->
-                    healthState.get().run {
-                        ctx.response.status(responseCode).send(message)
-                    }
-                }
-    }
-}
\ No newline at end of file
  */
 package org.onap.dcae.collectors.veshv.healthcheck.api
 
-import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
-import reactor.core.publisher.Flux
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-interface HealthStateProvider {
-
-    operator fun invoke(): Flux<HealthState>
-    fun changeState(healthState: HealthState)
-
-    companion object {
-        val INSTANCE: HealthStateProvider by lazy {
-            HealthStateProviderImpl()
-        }
-    }
+enum class HealthDescription(val message: String, val status: HealthStatus) {
+    HEALTHY("Healthy", HealthStatus.UP),
+    STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE),
+    RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
+    CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN)
 }
index 3dddf1e..853cc00 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.healthcheck.api
 
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateImpl
+import reactor.core.publisher.Flux
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-enum class HealthState(val message: String, val responseCode: Int) {
-    HEALTHY("Healthy", OK),
-    STARTING("Collector is starting", SERVICE_UNAVAILABLE),
-    WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE),
-    CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE)
+interface HealthState {
+
+    operator fun invoke(): Flux<HealthDescription>
+    fun changeState(healthDescription: HealthDescription)
+
+    companion object {
+        val INSTANCE: HealthState by lazy {
+            HealthStateImpl()
+        }
+    }
 }
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
new file mode 100644 (file)
index 0000000..79fc932
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+enum class HealthStatus(val httpResponseStatus: Int) {
+    UP(OK),
+    DOWN(SERVICE_UNAVAILABLE),
+    OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
+    UNKNOWN(SERVICE_UNAVAILABLE)
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
new file mode 100644 (file)
index 0000000..7e9efac
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.factory
+
+import arrow.effects.IO
+import io.netty.handler.codec.http.HttpResponseStatus
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.server.HttpServer
+import reactor.ipc.netty.http.server.HttpServerRequest
+import reactor.ipc.netty.http.server.HttpServerResponse
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) {
+
+    private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
+
+    fun start(): IO<ServerHandle> = IO {
+        healthState().subscribe(healthDescription::set)
+        val ctx = HttpServer.create(port).startRouter { routes ->
+            routes.get("/health/ready", ::readinessHandler)
+            routes.get("/health/alive", ::livenessHandler)
+        }
+        NettyServerHandle(ctx)
+    }
+
+    private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+            healthDescription.get().run {
+                resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+            }
+
+    private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+                resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+}
@@ -19,8 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.healthcheck.impl
 
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import reactor.core.publisher.Flux
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
@@ -29,11 +29,11 @@ import reactor.core.publisher.UnicastProcessor
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-internal class HealthStateProviderImpl : HealthStateProvider {
+internal class HealthStateImpl : HealthState {
 
-    private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create()
+    private val healthDescriptionStream: FluxProcessor<HealthDescription, HealthDescription> = UnicastProcessor.create()
 
-    override fun invoke(): Flux<HealthState> = healthStateStream
+    override fun invoke(): Flux<HealthDescription> = healthDescriptionStream
 
-    override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState)
+    override fun changeState(healthDescription: HealthDescription) = healthDescriptionStream.onNext(healthDescription)
 }
index e9c487b..e3fced2 100644 (file)
@@ -21,10 +21,9 @@ package org.onap.dcae.collectors.veshv.healthcheck.impl
 
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import reactor.test.StepVerifier
 
 /**
@@ -33,20 +32,20 @@ import reactor.test.StepVerifier
  */
 object HealthStateProviderImplTest : Spek({
     describe("Health state provider") {
-            val healthStateProviderImpl = HealthStateProviderImpl()
+            val healthStateProviderImpl = HealthStateImpl()
             on("health state update") {
-                healthStateProviderImpl.changeState(HealthState.HEALTHY)
-                healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
-                healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
-                healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+                healthStateProviderImpl.changeState(HealthDescription.HEALTHY)
+                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
 
                 it("should push new health state to the subscriber") {
                     StepVerifier
                             .create(healthStateProviderImpl().take(4))
-                            .expectNext(HealthState.HEALTHY)
-                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
-                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
-                            .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+                            .expectNext(HealthDescription.HEALTHY)
+                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                             .verifyComplete()
                 }
             }
index 0e95628..af64ced 100644 (file)
             <version>${project.parent.version}</version>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>io.ratpack</groupId>
-            <artifactId>ratpack-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-core</artifactId>
index dc92228..a84a39a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.main
 
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
-import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
+import org.onap.dcae.collectors.veshv.main.servers.VesServer
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
@@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK
 fun main(args: Array<String>) =
         ArgVesHvConfiguration().parse(args)
                 .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-                .map(::startHealthCheckApiServer)
-                .map(::createServer)
-                .map {
-                    it.start()
-                            .map(::logServerStarted)
-                            .flatMap(ServerHandle::await)
-                }
+                .map(::startAndAwaitServers)
                 .unsafeRunEitherSync(
                         { ex ->
                             logger.error("Failed to start a server", ex)
@@ -53,24 +46,9 @@ fun main(args: Array<String>) =
                         { logger.info("Gentle shutdown") }
                 )
 
-private fun createServer(config: ServerConfiguration): Server {
-    val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
-    val collectorProvider = CollectorFactory(
-            AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
-            sink,
-            MicrometerMetrics()
-    ).createVesHvCollectorProvider()
-
-    return ServerFactory.createNettyTcpServer(config, collectorProvider)
-}
-
-private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
-    logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}")
-}
-
-private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
-    HealthCheckApiServer(HealthStateProvider.INSTANCE)
-            .start(healthCheckApiPort)
-            .unsafeRunSync()
-            .also { logger.info("Health check api server started on port ${it.bindPort}") }
-}
+private fun startAndAwaitServers(config: ServerConfiguration) =
+        IO.monad().binding {
+            HealthCheckServer.start(config).bind()
+            VesServer.start(config).bind()
+                    .await().bind()
+        }.fix()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
new file mode 100644 (file)
index 0000000..04fc021
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object HealthCheckServer : ServerStarter() {
+    override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+
+    private fun createHealthCheckServer(config: ServerConfiguration) =
+            HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+
+    override fun serverStartedMessage(handle: ServerHandle) =
+            "Health check server is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
new file mode 100644 (file)
index 0000000..5c6f127
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerStarter {
+    fun start(config: ServerConfiguration): IO<ServerHandle> =
+            startServer(config)
+                    .map { logger.info(serverStartedMessage(it)); it }
+
+    protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
+    protected abstract fun serverStartedMessage(handle: ServerHandle): String
+
+    companion object {
+        private val logger = Logger(ServerStarter::class)
+    }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
new file mode 100644 (file)
index 0000000..fbf8936
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object VesServer : ServerStarter() {
+    override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
+
+    private fun createVesServer(config: ServerConfiguration): Server {
+        val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+        val collectorProvider = CollectorFactory(
+                AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+                sink,
+                MicrometerMetrics()
+        ).createVesHvCollectorProvider()
+
+        return ServerFactory.createNettyTcpServer(config, collectorProvider)
+    }
+
+    override fun serverStartedMessage(handle: ServerHandle) =
+            "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+}
\ No newline at end of file
index d20ffac..081dd0d 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.dcae.collectors.veshv.utils.http
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-class Status{
+class Status {
     companion object {
         const val OK = 200
         const val SERVICE_UNAVAILABLE = 503
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
new file mode 100644 (file)
index 0000000..bb924f2
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import reactor.ipc.netty.tcp.BlockingNettyContext
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerHandle(val host: String, val port: Int) {
+    abstract fun shutdown(): IO<Unit>
+    abstract fun await(): IO<Unit>
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+    override fun shutdown() = IO {
+        ctx.shutdown()
+    }
+
+    override fun await() = IO<Unit> {
+        ctx.context.channel().closeFuture().sync()
+    }
+}
diff --git a/pom.xml b/pom.xml
index 7504d4f..3f38d52 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
         <docker-image.namespace>onap</docker-image.namespace>
         <docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
         <docker.http_proxy></docker.http_proxy>
+
     </properties>
 
 
                     <version>${kotlin.version}</version>
                     <configuration>
                         <jvmTarget>1.8</jvmTarget>
+                        <experimentalCoroutines>enable</experimentalCoroutines>
                     </configuration>
                     <executions>
                         <execution>