Implement simple health check mechanism 03/59603/6
authorJakub Dudycz <jakub.dudycz@nokia.com>
Wed, 8 Aug 2018 07:17:14 +0000 (09:17 +0200)
committerJakub Dudycz <jakub.dudycz@nokia.com>
Thu, 9 Aug 2018 08:46:48 +0000 (10:46 +0200)
Change-Id: Ic4b8b59ced9dc19c9ebf26131036a9e1a752164f
Issue-ID: DCAEGEN2-659
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
20 files changed:
hv-collector-core/pom.xml
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
hv-collector-ct/pom.xml
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt [new file with mode: 0644]
hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
hv-collector-health-check/pom.xml
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt [moved from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/http/HealthCheckApiServer.kt with 76% similarity]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt [new file with mode: 0644]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt [new file with mode: 0644]
hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt [new file with mode: 0644]
hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt [new file with mode: 0644]
hv-collector-main/Dockerfile
hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt [new file with mode: 0644]
pom.xml

index 06687b7..784b247 100644 (file)
             <artifactId>hv-collector-utils</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-health-check</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-domain</artifactId>
index 7be24d2..3e652b9 100644 (file)
@@ -25,6 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -39,19 +41,20 @@ import java.util.concurrent.atomic.AtomicReference
  */
 class CollectorFactory(val configuration: ConfigurationProvider,
                        private val sinkProvider: SinkProvider,
-                       private val metrics: Metrics) {
+                       private val metrics: Metrics,
+                       private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
         val collector: AtomicReference<Collector> = AtomicReference()
         configuration()
                 .map(this::createVesHvCollector)
-                .doOnNext { logger.info("Using updated configuration for new connections") }
+                .doOnNext {
+                    logger.info("Using updated configuration for new connections")
+                    healthStateProvider.changeState(HealthState.HEALTHY)
+                }
                 .doOnError {
-                    logger.error("Shutting down", it)
-                    // TODO: create Health class
-                    // It should monitor all incidents and expose the results for the
-                    // container health check mechanism
-                    System.exit(ERROR_CODE)
+                    logger.error("Failed to acquire configuration from consul")
+                    healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(collector::set)
         return collector::get
@@ -67,7 +70,6 @@ class CollectorFactory(val configuration: ConfigurationProvider,
     }
 
     companion object {
-        private const val ERROR_CODE = 3
         private val logger = Logger(CollectorFactory::class)
     }
 }
index 7248db6..07b5c82 100644 (file)
@@ -24,7 +24,6 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import reactor.ipc.netty.http.client.HttpClient
-import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,9 +34,7 @@ object AdapterFactory {
     fun loggingSink(): SinkProvider = LoggingSinkProvider()
 
     fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
-            ConsulConfigurationProvider(
-                    httpAdapter(),
-                    configurationProviderParams)
+            ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
 
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
index 6f04c95..8146303 100644 (file)
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
-import org.slf4j.LoggerFactory
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Jitter
@@ -45,24 +46,30 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
                                            private val url: String,
                                            private val firstRequestDelay: Duration,
                                            private val requestInterval: Duration,
+                                           private val healthStateProvider: HealthStateProvider,
                                            retrySpec: Retry<Any>
+
 ) : ConfigurationProvider {
 
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
     private val retry = retrySpec
             .doOnRetry {
                 logger.warn("Could not get fresh configuration", it.exception())
+                healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
             }
 
-    constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+    constructor(http: HttpAdapter,
+                params: ConfigurationProviderParams) : this(
             http,
             params.configurationUrl,
             params.firstRequestDelay,
             params.requestInterval,
+            HealthStateProvider.INSTANCE,
             Retry.any<Any>()
                     .retryMax(MAX_RETRIES)
                     .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
-                    .jitter(Jitter.random()))
+                    .jitter(Jitter.random())
+    )
 
     override fun invoke(): Flux<CollectorConfiguration> =
             Flux.interval(firstRequestDelay, requestInterval)
index 1626c02..f9a9ba6 100644 (file)
@@ -23,9 +23,13 @@ import com.nhaarman.mockito_kotlin.eq
 import com.nhaarman.mockito_kotlin.mock
 import com.nhaarman.mockito_kotlin.whenever
 import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
 import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
@@ -40,62 +44,89 @@ import kotlin.test.assertEquals
  */
 internal object ConsulConfigurationProviderTest : Spek({
 
-    val httpAdapterMock: HttpAdapter = mock()
-    val firstRequestDelay = Duration.ofMillis(1)
-    val requestInterval = Duration.ofMillis(1)
-    val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1))
+    describe("Consul configuration provider") {
 
-    given("valid resource url") {
+        val httpAdapterMock: HttpAdapter = mock()
+        val healthStateProvider = HealthStateProvider.INSTANCE
 
-        val validUrl = "http://valid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(
-                httpAdapterMock,
-                validUrl,
-                firstRequestDelay,
-                requestInterval,
-                retry)
+        given("valid resource url") {
+            val validUrl = "http://valid-url/"
+            val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
 
-        whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
-                .thenReturn(Mono.just(constructConsulResponse()))
+            on("call to consul") {
+                whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+                        .thenReturn(Mono.just(constructConsulResponse()))
 
-        it("should use received configuration") {
+                it("should use received configuration") {
 
-            StepVerifier.create(consulConfigProvider().take(1))
-                    .consumeNextWith {
+                    StepVerifier.create(consulConfigProvider().take(1))
+                            .consumeNextWith {
 
-                        assertEquals("kafka:9093", it.kafkaBootstrapServers)
+                                assertEquals("kafka:9093", it.kafkaBootstrapServers)
 
-                        val route1 = it.routing.routes[0]
-                        assertEquals(Domain.FAULT, route1.domain)
-                        assertEquals("test-topic-1", route1.targetTopic)
+                                val route1 = it.routing.routes[0]
+                                assertEquals(Domain.FAULT, route1.domain)
+                                assertEquals("test-topic-1", route1.targetTopic)
 
-                        val route2 = it.routing.routes[1]
-                        assertEquals(Domain.HEARTBEAT, route2.domain)
-                        assertEquals("test-topic-2", route2.targetTopic)
+                                val route2 = it.routing.routes[1]
+                                assertEquals(Domain.HEARTBEAT, route2.domain)
+                                assertEquals("test-topic-2", route2.targetTopic)
+
+                            }.verifyComplete()
+                }
+            }
 
-                    }.verifyComplete()
+        }
+        given("invalid resource url") {
+            val invalidUrl = "http://invalid-url/"
+
+            val iterationCount = 3L
+            val consulConfigProvider = constructConsulConfigProvider(
+                    invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+            )
+
+            on("call to consul") {
+                whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+                        .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+                it("should interrupt the flux") {
+
+                    StepVerifier.create(consulConfigProvider())
+                            .verifyErrorMessage("Test exception")
+                }
+
+                it("should update the health state"){
+                    StepVerifier.create(healthStateProvider().take(iterationCount))
+                            .expectNextCount(iterationCount - 1)
+                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                            .verifyComplete()
+                }
+            }
         }
     }
-    given("invalid resource url") {
 
-        val invalidUrl = "http://invalid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(
-                httpAdapterMock,
-                invalidUrl,
-                firstRequestDelay,
-                requestInterval,
-                retry)
+})
 
-        whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
-                .thenReturn(Mono.error(RuntimeException("Test exception")))
+private fun constructConsulConfigProvider(url: String,
+                                          httpAdapter: HttpAdapter,
+                                          healthStateProvider: HealthStateProvider,
+                                          iterationCount: Long = 1
+): ConsulConfigurationProvider {
 
-        it("should interrupt the flux") {
+    val firstRequestDelay = Duration.ofMillis(1)
+    val requestInterval = Duration.ofMillis(1)
+    val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+    return ConsulConfigurationProvider(
+            httpAdapter,
+            url,
+            firstRequestDelay,
+            requestInterval,
+            healthStateProvider,
+            retry
+    )
+}
 
-            StepVerifier.create(consulConfigProvider())
-                    .verifyErrorMessage("Test exception")
-        }
-    }
-})
 
 fun constructConsulResponse(): String {
 
index 347bbbe..f4150c2 100644 (file)
             <groupId>org.jetbrains.spek</groupId>
             <artifactId>spek-junit-platform-engine</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+        </dependency>
     </dependencies>
 
 
index aaadcc7..e7b7770 100644 (file)
@@ -28,6 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import reactor.core.publisher.Flux
@@ -39,10 +40,11 @@ import java.time.Duration
  */
 class Sut(sink: Sink = StoringSink()) {
     val configurationProvider = FakeConfigurationProvider()
+    val healthStateProvider = FakeHealthStateProvider()
 
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
     private val metrics = FakeMetrics()
-    private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
+    private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
index 1f07c23..493517b 100644 (file)
@@ -22,15 +22,24 @@ package org.onap.dcae.collectors.veshv.tests.component
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.tests.fakes.*
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
 import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
-import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
 import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
+import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Flux
 import java.time.Duration
 
@@ -219,127 +228,143 @@ object VesHvSpecification : Spek({
 
         val defaultTimeout = Duration.ofSeconds(10)
 
-        it("should update collector on configuration change") {
-            val (sut, _) = vesHvWithStoringSink()
+        given("successful configuration change") {
 
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
-            val firstCollector = sut.collector
+            lateinit var sut: Sut
+            lateinit var sink: StoringSink
 
-            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
-            val collectorAfterUpdate = sut.collector
+            beforeEachTest {
+                vesHvWithStoringSink().run {
+                    sut = first
+                    sink = second
+                }
+            }
 
-            assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+            it("should update collector") {
+                val firstCollector = sut.collector
 
-        }
+                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                val collectorAfterUpdate = sut.collector
 
-        it("should start routing messages on configuration change") {
-            val (sut, sink) = vesHvWithStoringSink()
+                assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+            }
 
-            sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+            it("should start routing messages") {
 
-            val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
-            assertThat(messages).isEmpty()
+                sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
 
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                assertThat(messages).isEmpty()
 
-            val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
-            assertThat(messagesAfterUpdate).hasSize(1)
-            val message = messagesAfterUpdate[0]
+                sut.configurationProvider.updateConfiguration(basicConfiguration)
 
-            assertThat(message.topic).describedAs("routed message topic after configuration's change")
-                    .isEqualTo(HVRANMEAS_TOPIC)
-            assertThat(message.partition).describedAs("routed message partition")
-                    .isEqualTo(0)
-        }
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                assertThat(messagesAfterUpdate).hasSize(1)
+                val message = messagesAfterUpdate[0]
 
-        it("should change domain routing on configuration change") {
-            val (sut, sink) = vesHvWithStoringSink()
+                assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                        .isEqualTo(HVRANMEAS_TOPIC)
+                assertThat(message.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
+            }
 
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            it("should change domain routing") {
 
-            val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
-            assertThat(messages).hasSize(1)
-            val firstMessage = messages[0]
+                val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                assertThat(messages).hasSize(1)
+                val firstMessage = messages[0]
 
-            assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
-                    .isEqualTo(HVRANMEAS_TOPIC)
-            assertThat(firstMessage.partition).describedAs("routed message partition")
-                    .isEqualTo(0)
+                assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                        .isEqualTo(HVRANMEAS_TOPIC)
+                assertThat(firstMessage.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
 
 
-            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
 
-            val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
-            assertThat(messagesAfterUpdate).hasSize(2)
-            val secondMessage = messagesAfterUpdate[1]
+                val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                assertThat(messagesAfterUpdate).hasSize(2)
+                val secondMessage = messagesAfterUpdate[1]
 
-            assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
-                    .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
-            assertThat(secondMessage.partition).describedAs("routed message partition")
-                    .isEqualTo(0)
-        }
+                assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                        .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+                assertThat(secondMessage.partition).describedAs("routed message partition")
+                        .isEqualTo(0)
+            }
 
-        it("should update routing for each client sending one message") {
-            val (sut, sink) = vesHvWithStoringSink()
+            it("should update routing for each client sending one message") {
 
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
+                val messagesAmount = 10
+                val messagesForEachTopic = 5
 
-            val messagesAmount = 10
-            val messagesForEachTopic = 5
+                Flux.range(0, messagesAmount).doOnNext {
+                    if (it == messagesForEachTopic) {
+                        sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                    }
+                }.doOnNext {
+                    sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+                }.then().block(defaultTimeout)
 
-            Flux.range(0, messagesAmount).doOnNext {
-                if (it == messagesForEachTopic) {
-                    sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
-                }
-            }.doOnNext {
-                sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
-            }.then().block(defaultTimeout)
 
+                val messages = sink.sentMessages
+                val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
 
-            val messages = sink.sentMessages
-            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
-            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+                assertThat(messages.size).isEqualTo(messagesAmount)
+                assertThat(messagesForEachTopic)
+                        .describedAs("amount of messages routed to each topic")
+                        .isEqualTo(firstTopicMessagesCount)
+                        .isEqualTo(secondTopicMessagesCount)
+            }
 
-            assertThat(messages.size).isEqualTo(messagesAmount)
-            assertThat(messagesForEachTopic)
-                    .describedAs("amount of messages routed to each topic")
-                    .isEqualTo(firstTopicMessagesCount)
-                    .isEqualTo(secondTopicMessagesCount)
-        }
+            it("should not update routing for client sending continuous stream of messages") {
 
+                val messageStreamSize = 10
+                val pivot = 5
 
-        it("should not update routing for client sending continuous stream of messages") {
-            val (sut, sink) = vesHvWithStoringSink()
+                val incomingMessages = Flux.range(0, messageStreamSize)
+                        .doOnNext {
+                            if (it == pivot) {
+                                sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                                println("config changed")
+                            }
+                        }
+                        .map { vesWireFrameMessage(Domain.HVRANMEAS) }
 
-            sut.configurationProvider.updateConfiguration(basicConfiguration)
 
-            val messageStreamSize = 10
-            val pivot = 5
+                sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
 
-            val incomingMessages = Flux.range(0, messageStreamSize)
-                    .doOnNext {
-                        if (it == pivot) {
-                            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
-                            println("config changed")
-                        }
-                    }
-                    .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+                val messages = sink.sentMessages
+                val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
 
+                assertThat(messages.size).isEqualTo(messageStreamSize)
+                assertThat(firstTopicMessagesCount)
+                        .describedAs("amount of messages routed to first topic")
+                        .isEqualTo(messageStreamSize)
 
-            sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+                assertThat(secondTopicMessagesCount)
+                        .describedAs("amount of messages routed to second topic")
+                        .isEqualTo(0)
+            }
 
-            val messages = sink.sentMessages
-            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
-            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+            it("should mark the application healthy") {
+                assertThat(sut.healthStateProvider.currentHealth)
+                        .describedAs("application health state")
+                        .isEqualTo(HealthState.HEALTHY)
+            }
+        }
 
-            assertThat(messages.size).isEqualTo(messageStreamSize)
-            assertThat(firstTopicMessagesCount)
-                    .describedAs("amount of messages routed to first topic")
-                    .isEqualTo(messageStreamSize)
+        given("failed configuration change") {
+            val (sut, _) = vesHvWithStoringSink()
+            sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
 
-            assertThat(secondTopicMessagesCount)
-                    .describedAs("amount of messages routed to second topic")
-                    .isEqualTo(0)
+            it("should mark the application unhealthy ") {
+                assertThat(sut.healthStateProvider.currentHealth)
+                        .describedAs("application health state")
+                        .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+            }
         }
     }
 
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
new file mode 100644 (file)
index 0000000..09fd232
--- /dev/null
@@ -0,0 +1,18 @@
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import reactor.core.publisher.Flux
+
+class FakeHealthStateProvider : HealthStateProvider {
+
+    lateinit var currentHealth: HealthState
+
+    override fun changeState(healthState: HealthState) {
+        currentHealth = healthState
+    }
+
+    override fun invoke(): Flux<HealthState> {
+        throw NotImplementedError()
+    }
+}
index b89113f..ebeaa69 100644 (file)
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
+import reactor.retry.RetryExhaustedException
 
 
 const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
@@ -83,10 +84,19 @@ val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration
 )
 
 class FakeConfigurationProvider : ConfigurationProvider {
+    private var shouldThrowException = false
     private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
 
-    fun updateConfiguration(collectorConfiguration: CollectorConfiguration) {
-        configStream.onNext(collectorConfiguration)
+    fun updateConfiguration(collectorConfiguration: CollectorConfiguration) =
+            if (shouldThrowException) {
+                configStream.onError(RetryExhaustedException("I'm so tired"))
+            } else {
+                configStream.onNext(collectorConfiguration)
+            }
+
+
+    fun shouldThrowExceptionOnConfigUpdate(shouldThrowException: Boolean) {
+        this.shouldThrowException = shouldThrowException
     }
 
     override fun invoke() = configStream
index 4072ec2..1e77adb 100644 (file)
@@ -14,7 +14,6 @@
 
     <properties>
         <skipAnalysis>false</skipAnalysis>
-        <failIfMissingUnitTests>false</failIfMissingUnitTests>
     </properties>
 
     <parent>
     </build>
 
     <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>hv-collector-utils</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-stdlib-jdk8</artifactId>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-junit-platform-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.healthcheck.http
+package org.onap.dcae.collectors.veshv.healthcheck.api
 
 import arrow.effects.IO
 import ratpack.handling.Chain
-import ratpack.http.Status
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-class HealthCheckApiServer {
+class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) {
+
+    private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING)
 
     fun start(port: Int): IO<RatpackServer> = IO {
+        healthStateProvider().subscribe(healthState::set)
         RatpackServer
                 .start {
                     it
@@ -43,7 +46,9 @@ class HealthCheckApiServer {
     private fun configureHandlers(chain: Chain) {
         chain
                 .get("healthcheck") { ctx ->
-                    ctx.response.status(Status.OK).send()
+                    healthState.get().run {
+                        ctx.response.status(responseCode).send(message)
+                    }
                 }
     }
 }
\ No newline at end of file
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
new file mode 100644 (file)
index 0000000..3dddf1e
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+enum class HealthState(val message: String, val responseCode: Int) {
+    HEALTHY("Healthy", OK),
+    STARTING("Collector is starting", SERVICE_UNAVAILABLE),
+    WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE),
+    CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE)
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
new file mode 100644 (file)
index 0000000..5cc09cc
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
+import reactor.core.publisher.Flux
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+interface HealthStateProvider {
+
+    operator fun invoke(): Flux<HealthState>
+    fun changeState(healthState: HealthState)
+
+    companion object {
+        val INSTANCE: HealthStateProvider by lazy {
+            HealthStateProviderImpl()
+        }
+    }
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
new file mode 100644 (file)
index 0000000..5056d2d
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.impl
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import reactor.core.publisher.Flux
+import reactor.core.publisher.FluxProcessor
+import reactor.core.publisher.UnicastProcessor
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+internal class HealthStateProviderImpl : HealthStateProvider {
+
+    private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create()
+
+    override fun invoke(): Flux<HealthState> = healthStateStream
+
+    override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState)
+}
diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
new file mode 100644 (file)
index 0000000..e9c487b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.impl
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import reactor.test.StepVerifier
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+object HealthStateProviderImplTest : Spek({
+    describe("Health state provider") {
+            val healthStateProviderImpl = HealthStateProviderImpl()
+            on("health state update") {
+                healthStateProviderImpl.changeState(HealthState.HEALTHY)
+                healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+
+                it("should push new health state to the subscriber") {
+                    StepVerifier
+                            .create(healthStateProviderImpl().take(4))
+                            .expectNext(HealthState.HEALTHY)
+                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+                            .verifyComplete()
+                }
+            }
+    }
+})
\ No newline at end of file
index c077440..fb7c7ae 100644 (file)
@@ -5,6 +5,10 @@ LABEL license.name="The Apache Software License, Version 2.0"
 LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
 LABEL maintainer="Nokia Wroclaw ONAP Team"
 
+RUN apt-get update \
+        && apt-get install -y --no-install-recommends curl  \
+        && apt-get clean
+
 WORKDIR /opt/ves-hv-collector
 ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
 COPY target/libs/external/* ./
index 23d7d2e..dc92228 100644 (file)
@@ -23,7 +23,8 @@ import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.boundary.ServerHandle
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.http.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
@@ -68,7 +69,7 @@ private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
 }
 
 private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
-    HealthCheckApiServer()
+    HealthCheckApiServer(HealthStateProvider.INSTANCE)
             .start(healthCheckApiPort)
             .unsafeRunSync()
             .also { logger.info("Health check api server started on port ${it.bindPort}") }
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
new file mode 100644 (file)
index 0000000..d20ffac
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.http
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+class Status{
+    companion object {
+        const val OK = 200
+        const val SERVICE_UNAVAILABLE = 503
+    }
+}
diff --git a/pom.xml b/pom.xml
index 19b9892..ac5e1f2 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                 <version>${spek.version}</version>
                 <scope>test</scope>
             </dependency>
-
             <dependency>
                 <groupId>org.assertj</groupId>
                 <artifactId>assertj-core</artifactId>