Merge Healthcheck descriptions 31/75431/6
authorFilip Krzywka <filip.krzywka@nokia.com>
Tue, 8 Jan 2019 06:53:25 +0000 (07:53 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Tue, 8 Jan 2019 11:11:26 +0000 (12:11 +0100)
- start new API server for healthchecks in xNF simulator on port 6063
- changed DCAE App default port to 6064
- switched to InetSocketAddresses usage in components configurations

Change-Id: I398f9ea6e887f78d88286ed717d310d3297b1571
Issue-ID: DCAEGEN2-1063
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
28 files changed:
development/bin/constants.sh [new file with mode: 0755]
development/bin/dcae-msgs.sh
development/bin/dcae-reset.sh
development/bin/dcae-topic.sh
development/bin/start-simulation.sh
development/docker-compose.yml
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
sources/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
sources/hv-collector-xnf-simulator/pom.xml
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt [new file with mode: 0644]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt [deleted file]
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt

diff --git a/development/bin/constants.sh b/development/bin/constants.sh
new file mode 100755 (executable)
index 0000000..f0df9b0
--- /dev/null
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2019 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+DCAE_APP_HOSTNAME=localhost
+DCAE_APP_PORT=6064
+DCAE_APP_ADDRESS=${DCAE_APP_HOSTNAME}:${DCAE_APP_PORT}
\ No newline at end of file
index cb05a8c..964be14 100755 (executable)
@@ -56,9 +56,12 @@ while getopts "$optspec" arg; do
 done
 shift $((OPTIND-1))
 
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
 if [ -n "${VERBOSE+x}" ]; then
     echo "All messages count currently consumed by dcae app simulator: "
 fi
 
-curl --request GET localhost:6063/messages/all/count
+curl --request GET ${DCAE_APP_ADDRESS}/messages/all/count
 echo
index e5b7b05..03baf97 100755 (executable)
@@ -57,9 +57,12 @@ while getopts "$optspec" arg; do
 done
 shift $((OPTIND-1))
 
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
 if [ -n "${VERBOSE+x}" ]; then
-    echo "Requesting DCAE app running on port 6063 to reset messages count"
+    echo "Requesting DCAE app running on port ${DCAE_APP_PORT} to reset messages count"
 fi
 
-curl --request DELETE localhost:6063/messages
+curl --request DELETE ${DCAE_APP_ADDRESS}/messages
 echo
index 8c17622..aefb1d0 100755 (executable)
@@ -56,11 +56,14 @@ while getopts "$optspec" arg; do
 done
 shift $((OPTIND-1))
 
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
 TOPIC=${1:-HV_VES_PERF3GPP}
 
 if [ -n "${VERBOSE+x}" ]; then
-    echo "Requesting DCAE app running on port 6063 to consume messages from topic: ${TOPIC}"
+    echo "Requesting DCAE app running on ${DCAE_APP_ADDRESS} to consume messages from topic: ${TOPIC}"
 fi
 
-curl --request PUT localhost:6063/configuration/topics -d ${TOPIC}
+curl --request PUT ${DCAE_APP_ADDRESS}/configuration/topics -d ${TOPIC}
 echo
\ No newline at end of file
index beede92..dfb63e6 100755 (executable)
@@ -114,7 +114,7 @@ function wait_for_containers_startup_or_fail() {
 
 function start_simulation() {
     verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves
-    ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}"
+    ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}s"
     for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do
         start_single_simulation $port $MESSAGES_IN_BATCH &
     done
index adf8947..8b5854c 100644 (file)
@@ -91,12 +91,20 @@ services:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
     ports:
     - "6062:6062/tcp"
+    - "6063:6063"
     command: ["--listen-port", "6062",
+              "--health-check-api-port", "6063",
               "--ves-host", "ves-hv-collector",
               "--ves-port", "6061",
               "--key-store", "/etc/ves-hv/client.p12",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
+    healthcheck:
+      test: curl -f http://localhost:6063/health/ready || exit 1
+      interval: 10s
+      timeout: 3s
+      retries: 3
+      start_period: 10s
     depends_on:
     - ves-hv-collector
     volumes:
@@ -105,8 +113,8 @@ services:
   dcae-app-simulator:
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
     ports:
-    - "6063:6063/tcp"
-    command: ["--listen-port", "6063",
+    - "6064:6064/tcp"
+    command: ["--listen-port", "6064",
               "--kafka-bootstrap-servers", "message-router-kafka:9092",
               "--kafka-topics", "HV_VES_PERF3GPP"]
     depends_on:
index fe2b89d..861065c 100644 (file)
@@ -56,7 +56,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                 }
                 .doOnError {
                     logger.error { "Failed to acquire configuration from consul" }
-                    healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+                    healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(config::set)
 
index 87399ca..d58cc79 100644 (file)
@@ -56,7 +56,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
     private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
     private val retry = retrySpec.doOnRetry {
         logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
-        healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+        healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
     }
 
     constructor(http: HttpAdapter,
index a92d376..ccae3c9 100644 (file)
@@ -39,8 +39,6 @@ import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
 import java.time.Duration
-import java.util.*
-import kotlin.test.assertEquals
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -108,7 +106,7 @@ internal object ConsulConfigurationProviderTest : Spek({
                 it("should update the health state") {
                     StepVerifier.create(healthStateProvider().take(iterationCount))
                             .expectNextCount(iterationCount - 1)
-                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+                            .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
                             .verifyComplete()
                 }
             }
index 338c373..75e7cf0 100644 (file)
@@ -314,7 +314,7 @@ object VesHvSpecification : Spek({
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
                         .describedAs("application health state")
-                        .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+                        .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
             }
         }
     }
index e54eb35..88e01c2 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import ratpack.handling.Chain
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
+import java.net.InetSocketAddress
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -52,11 +53,13 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
     }
 
 
-    fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+    fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> =
             simulator.listenToTopics(kafkaTopics).map {
                 RatpackServer.start { server ->
-                    server.serverConfig(ServerConfig.embedded().port(port))
-                            .handlers(::setupHandlers)
+                    server.serverConfig(
+                            ServerConfig.embedded()
+                                    .port(socketAddress.port)
+                    ).handlers(::setupHandlers)
                 }
             }
 
index 17eeb5b..54fd6f4 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.utils.commandline.intValue
 import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
 
 class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList: List<CommandLineOption> = listOf(
@@ -59,7 +60,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
                         .bind()
 
                 DcaeAppSimConfiguration(
-                        listenPort,
+                        InetSocketAddress(listenPort),
                         maxPayloadSizeBytes,
                         kafkaBootstrapServers,
                         kafkaTopics)
index a6fc805..2b0382a 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
+import java.net.InetSocketAddress
+
 data class DcaeAppSimConfiguration(
-        val apiPort: Int,
+        val apiAddress: InetSocketAddress,
         val maxPayloadSizeBytes: Int,
         val kafkaBootstrapServers: String,
         val kafkaTopics: Set<String>
index 5856f04..abf60b0 100644 (file)
@@ -57,6 +57,6 @@ private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
     val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
     val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
     return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
-            .start(config.apiPort, config.kafkaTopics)
+            .start(config.apiAddress, config.kafkaTopics)
             .unit()
 }
index 7137fe1..055ca19 100644 (file)
@@ -54,7 +54,7 @@ internal class ArgDcaeAppSimConfigurationTest : Spek({
             }
 
             it("should set proper port") {
-                assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+                assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt())
             }
 
 
@@ -79,7 +79,7 @@ internal class ArgDcaeAppSimConfigurationTest : Spek({
             }
 
             it("should set proper port") {
-                assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+                assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt())
             }
 
             it("should set proper kafka bootstrap servers") {
index 8c69406..4758fb6 100644 (file)
@@ -25,8 +25,11 @@ package org.onap.dcae.collectors.veshv.healthcheck.api
  * @since August 2018
  */
 enum class HealthDescription(val message: String, val status: HealthStatus) {
+    STARTING("Component is starting", HealthStatus.OUT_OF_SERVICE),
     HEALTHY("Healthy", HealthStatus.UP),
-    STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE),
-    RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
-    CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN)
+    BUSY("Processing at least one request", HealthStatus.UP),
+    IDLE("No simulation is in progress at the moment", HealthStatus.UP),
+    /* Configuration related */
+    RETRYING_FOR_DYNAMIC_CONFIGURATION("Dynamic configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
+    DYNAMIC_CONFIGURATION_NOT_FOUND("Dynamic configuration not found", HealthStatus.DOWN)
 }
index aead71b..7aade34 100644 (file)
@@ -59,7 +59,7 @@ class HealthCheckApiServer(private val healthState: HealthState,
 
     private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
             healthDescription.get().run {
-                logger.debug { "HV-VES status: $status, $message" }
+                logger.debug { "Component status: $status, $message" }
                 resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
             }
 
index e3fced2..9a0fe2f 100644 (file)
@@ -35,17 +35,17 @@ object HealthStateProviderImplTest : Spek({
             val healthStateProviderImpl = HealthStateImpl()
             on("health state update") {
                 healthStateProviderImpl.changeState(HealthDescription.HEALTHY)
-                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
-                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
-                healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+                healthStateProviderImpl.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
 
                 it("should push new health state to the subscriber") {
                     StepVerifier
                             .create(healthStateProviderImpl().take(4))
                             .expectNext(HealthDescription.HEALTHY)
-                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
-                            .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
-                            .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+                            .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+                            .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+                            .expectNext(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
                             .verifyComplete()
                 }
             }
index 05d1309..3c2c64a 100644 (file)
@@ -46,10 +46,9 @@ object ExitSuccess : ExitCode() {
 
 data class ExitFailure(override val code: Int) : ExitCode()
 
-fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
         flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
 
-
 fun IO<Any>.unit() = map { Unit }
 
 fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
index de26256..33197dd 100644 (file)
             <artifactId>hv-collector-ves-message-generator</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+            <artifactId>hv-collector-health-check</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>${project.parent.groupId}</groupId>
             <artifactId>hv-collector-test-utils</artifactId>
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects-instances</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.jetbrains.kotlinx</groupId>
             <artifactId>kotlinx-coroutines-core</artifactId>
index ca6d169..75ff895 100644 (file)
@@ -43,8 +43,7 @@ import reactor.netty.tcp.TcpClient
 class VesHvClient(private val configuration: SimulatorConfiguration) {
 
     private val client: TcpClient = TcpClient.create()
-            .host(configuration.vesHost)
-            .port(configuration.vesPort)
+            .addressSupplier { configuration.hvVesAddress }
             .configureSsl()
 
     private fun TcpClient.configureSsl() =
@@ -61,14 +60,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .handle { _, output -> handler(complete, messages, output) }
                 .connect()
                 .doOnError {
-                    logger.info {
-                        "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
-                    }
+                    logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
                 }
                 .subscribe {
-                    logger.info {
-                        "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
-                    }
+                    logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
                 }
         return complete.then()
     }
index a078562..654f16a 100644 (file)
@@ -23,10 +23,6 @@ import arrow.core.Either
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.BUSY
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.DETAILED_STATUS_NODE
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.IDLE
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
 import org.onap.dcae.collectors.veshv.utils.http.Response
 import org.onap.dcae.collectors.veshv.utils.http.Responses
 import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
@@ -38,8 +34,8 @@ import ratpack.handling.Context
 import ratpack.http.TypedData
 import ratpack.server.RatpackServer
 import ratpack.server.ServerConfig
+import java.net.InetSocketAddress
 import java.util.*
-import javax.json.Json
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -49,9 +45,10 @@ internal class XnfApiServer(
         private val xnfSimulator: XnfSimulator,
         private val ongoingSimulations: OngoingSimulations) {
 
-    fun start(port: Int): IO<RatpackServer> = IO {
+    fun start(socketAddress: InetSocketAddress): IO<RatpackServer> = IO {
         RatpackServer.start { server ->
-            server.serverConfig(ServerConfig.embedded().port(port))
+            server.serverConfig(ServerConfig.embedded()
+                    .port(socketAddress.port))
                     .handlers(this::configureHandlers)
         }
     }
@@ -61,7 +58,6 @@ internal class XnfApiServer(
                 .post("simulator", ::startSimulationHandler)
                 .post("simulator/async", ::startSimulationHandler)
                 .get("simulator/:id", ::simulatorStatusHandler)
-                .get("healthcheck", ::healthcheckHandler)
     }
 
     private fun startSimulationHandler(ctx: Context) {
@@ -93,21 +89,6 @@ internal class XnfApiServer(
         ctx.response.sendAndHandleErrors(IO.just(response))
     }
 
-    private fun healthcheckHandler(ctx: Context) {
-        val healthCheckDetailedMessage = createHealthCheckDetailedMessage()
-        val simulatorStatus = HttpConstants.STATUS_OK
-        logger.info { "Returning simulator status: ${simulatorStatus} ${healthCheckDetailedMessage}" }
-        ctx.response.status(simulatorStatus).send(healthCheckDetailedMessage)
-    }
-
-    private fun createHealthCheckDetailedMessage() =
-            Json.createObjectBuilder()
-                    .add(DETAILED_STATUS_NODE, when {
-                        ongoingSimulations.isAnySimulationPending() -> BUSY
-                        else -> IDLE
-                    })
-                    .build().toString()
-
     companion object {
         private val logger = Logger(XnfApiServer::class)
     }
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt
new file mode 100644 (file)
index 0000000..5e1c979
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+
+
+internal class XnfHealthCheckServer {
+    fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config)
+            .start()
+            .map { logger.info(serverStartedMessage(it)); it }
+
+    private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer {
+        val monitoring = object : PrometheusMetricsProvider {
+            override fun lastStatus(): Mono<String> = Mono.just("not implemented")
+        }
+        return HealthCheckApiServer(
+                HealthState.INSTANCE,
+                monitoring,
+                config.healthCheckApiListenAddress)
+    }
+
+    private fun serverStartedMessage(handle: ServerHandle) =
+            { "Health check server is up and listening on ${handle.host}:${handle.port}" }
+
+    companion object {
+        private val logger = Logger(XnfHealthCheckServer::class)
+    }
+}
index 0b32136..7885514 100644 (file)
@@ -28,17 +28,19 @@ import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
 import org.onap.dcae.collectors.veshv.utils.commandline.intValue
 import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -49,6 +51,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
             VES_HV_PORT,
             VES_HV_HOST,
             LISTEN_PORT,
+            HEALTH_CHECK_API_PORT,
             MAXIMUM_PAYLOAD_SIZE_BYTES,
             SSL_DISABLE,
             KEY_STORE_FILE,
@@ -61,14 +64,20 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
                 val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+                val healthCheckApiListenAddress = cmdLine.intValue(HEALTH_CHECK_API_PORT,
+                        DefaultValues.HEALTH_CHECK_API_PORT)
                 val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
                         WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
 
                 SimulatorConfiguration(
-                        listenPort,
-                        vesHost,
-                        vesPort,
+                        InetSocketAddress(listenPort),
+                        InetSocketAddress(healthCheckApiListenAddress),
+                        InetSocketAddress(vesHost, vesPort),
                         maxPayloadSizeBytes,
                         createSecurityConfiguration(cmdLine).bind())
             }.fix()
+
+    internal object DefaultValues {
+        const val HEALTH_CHECK_API_PORT = 6063
+    }
 }
index 3395d28..5a0e73c 100644 (file)
 package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
 
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since June 2018
  */
 data class SimulatorConfiguration(
-        val listenPort: Int,
-        val vesHost: String,
-        val vesPort: Int,
+        val listenAddress: InetSocketAddress,
+        val healthCheckApiListenAddress: InetSocketAddress,
+        val hvVesAddress: InetSocketAddress,
         val maxPayloadSizeBytes: Int,
         val security: SecurityConfiguration)
index bd58dd9..fb71b2c 100644 (file)
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
 
 import arrow.effects.IO
 import kotlinx.coroutines.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.util.*
@@ -32,13 +35,15 @@ import java.util.concurrent.Executors
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+                         private val healthState: HealthState = HealthState.INSTANCE) {
     private val asyncSimulationContext = executor.asCoroutineDispatcher()
     private val simulations = ConcurrentHashMap<UUID, Status>()
 
     fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
         val id = UUID.randomUUID()
         simulations[id] = StatusOngoing
+        updateHealthState()
 
         simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
             result.fold(
@@ -50,20 +55,22 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
                         logger.info { "Finished sending messages" }
                         simulations[id] = StatusSuccess
                     }
-            )
+            ).also { updateHealthState() }
         }
         return id
     }
 
-    fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+    private fun updateHealthState() = healthState.changeState(currentState())
+
+    private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
 
-    fun isAnySimulationPending() = simulations.any {
+    internal fun isAnySimulationPending() = simulations.any {
         status(it.key) is StatusOngoing
     }
 
-    internal fun clear() {
-        simulations.clear()
-    }
+    fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+    internal fun clear() = simulations.clear()
 
     companion object {
         private val logger = Logger(XnfApiServer::class)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt
deleted file mode 100644 (file)
index a86e3d5..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
-
-// TODO: probably should be merged with HealthDescription or made similiar to it
-internal object XnfStatus {
-
-    const val BUSY = "Busy"
-    const val IDLE = "Idle"
-    const val DETAILED_STATUS_NODE = "Detailed status"
-}
index 91070d3..308c686 100644 (file)
  */
 package org.onap.dcae.collectors.veshv.simulators.xnf
 
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
 import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import ratpack.server.RatpackServer
 
 private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
 private val logger = Logger(PACKAGE_NAME)
@@ -41,15 +49,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
  */
 fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-        .map { config ->
-            logger.info { "Using configuration: $config" }
-            val xnfSimulator = XnfSimulator(
-                    VesHvClient(config),
-                    MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
-            XnfApiServer(xnfSimulator, OngoingSimulations())
-                    .start(config.listenPort)
-                    .unit()
-        }
+        .map(::startServers)
         .unsafeRunEitherSync(
                 { ex ->
                     logger.withError { log("Failed to start a server", ex) }
@@ -57,5 +57,18 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
                 },
                 {
                     logger.info { "Started xNF Simulator API server" }
+                    HealthState.INSTANCE.changeState(HealthDescription.IDLE)
                 }
         )
+
+private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
+        IO.monad().binding {
+            logger.info { "Using configuration: $config" }
+            XnfHealthCheckServer().startServer(config).bind()
+            val xnfSimulator = XnfSimulator(
+                    VesHvClient(config),
+                    MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+            XnfApiServer(xnfSimulator, OngoingSimulations())
+                    .start(config.listenAddress)
+                    .bind()
+        }.fix()