--- /dev/null
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2019 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+DCAE_APP_HOSTNAME=localhost
+DCAE_APP_PORT=6064
+DCAE_APP_ADDRESS=${DCAE_APP_HOSTNAME}:${DCAE_APP_PORT}
\ No newline at end of file
done
shift $((OPTIND-1))
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
if [ -n "${VERBOSE+x}" ]; then
echo "All messages count currently consumed by dcae app simulator: "
fi
-curl --request GET localhost:6063/messages/all/count
+curl --request GET ${DCAE_APP_ADDRESS}/messages/all/count
echo
done
shift $((OPTIND-1))
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
if [ -n "${VERBOSE+x}" ]; then
- echo "Requesting DCAE app running on port 6063 to reset messages count"
+ echo "Requesting DCAE app running on port ${DCAE_APP_PORT} to reset messages count"
fi
-curl --request DELETE localhost:6063/messages
+curl --request DELETE ${DCAE_APP_ADDRESS}/messages
echo
done
shift $((OPTIND-1))
+DEVELOPMENT_BIN_DIRECTORY=$(realpath $(dirname "$0"))
+source ${DEVELOPMENT_BIN_DIRECTORY}/constants.sh
+
TOPIC=${1:-HV_VES_PERF3GPP}
if [ -n "${VERBOSE+x}" ]; then
- echo "Requesting DCAE app running on port 6063 to consume messages from topic: ${TOPIC}"
+ echo "Requesting DCAE app running on ${DCAE_APP_ADDRESS} to consume messages from topic: ${TOPIC}"
fi
-curl --request PUT localhost:6063/configuration/topics -d ${TOPIC}
+curl --request PUT ${DCAE_APP_ADDRESS}/configuration/topics -d ${TOPIC}
echo
\ No newline at end of file
usage() {
echo "Start xnf-simulator container on given port and inside of given docker-network"
- echo "Usage: $0 [-h|--help] [-v|--verbose] <xnf listen port> [<hv ves docker network>]"
+ echo "Usage: $0 [-h|--help] [-v|--verbose] [--ssl-disable] <xnf listen port> [<hv ves hostname> <hv ves port> <hv ves docker network>]"
+ echo ""
+ echo "Optional parameters:"
+ echo " - ssl-disable : Should xNF simulator be configured without using SSL/TLS connections"
+ echo "Default values:"
+ echo " - hv ves hostname: ves-hv-collector"
+ echo " - hv ves port: 6061"
exit 1
}
case "${OPTARG}" in
verbose)
VERBOSE=True ;;
+ ssl-disable)
+ SSL_DISABLE=True ;;
help)
usage ;;
*)
LISTEN_PORT=$1
-if [ $# -gt 1 ]; then
- HV_VES_NETWORK=${2}
+HV_VES_HOSTNAME=${2:-ves-hv-collector}
+HV_VES_PORT=${3:-6061}
+if [ $# -gt 3 ]; then
+ HV_VES_NETWORK=${4}
fi
PORTS="${LISTEN_PORT}:${LISTEN_PORT}/tcp"
HV_VES_REPO_HOME=$(realpath $(dirname "$0"))/..
+if [ -n "${SSL_DISABLE+x}" ]; then
+ SSL_CONFIGURATION="--ssl-disable"
+else
+ SSL_CONFIGURATION="--key-store-password onaponap --trust-store-password onaponap"
+fi
+
if [ -n "${VERBOSE+x}" ]; then
- echo "Starting xnf-simulator with ports configuration: ${PORTS}"
+ echo "Starting xnf-simulator with "
+ echo " - ports configuration: ${PORTS}"
+ echo " - SSL configuration: ${SSL_CONFIGURATION}"
echo "Container id:"
fi
+
XNF_CONTAINER_ID=$(docker run -d \
-v ${HV_VES_REPO_HOME}/ssl/:/etc/ves-hv/ \
+ --health-cmd='curl -s -f http://localhost:6063/health/ready || exit 1' \
+ --health-interval=5s \
+ --health-retries=3 \
+ --health-start-period='10s' \
-p ${PORTS} \
onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator \
--listen-port ${LISTEN_PORT} \
- --ves-host ves-hv-collector \
- --ves-port 6061 \
- --key-store-password onaponap \
- --trust-store-password onaponap)
+ --health-check-api-port 6063 \
+ --ves-host ${HV_VES_HOSTNAME} \
+ --ves-port ${HV_VES_PORT} \
+ ${SSL_CONFIGURATION})
echo $XNF_CONTAINER_ID
function usage() {
echo ""
echo "Send messages to hv-ves from multiple xNF simulators"
- echo "Usage: $0 [-h|--help] [-v|--verbose] [--messages-in-batch] [--docker-network] [--xnf-logs-directory]"
+ echo "Usage: $0 [-h|--help] [-v|--verbose] [--ssl-disable]"
+ echo " [--messages-in-batch=ARG] [--docker-network=ARG] [--xnf-logs-directory=ARG]"
echo " <hv ves hostname> <hv ves port> <simulators amount> <messages batches amount per simulator> <messages sending interval>"
echo ""
echo " - hv ves hostname : HighVolume VES Collector network hostname"
echo " - hv ves port : HighVolume VES Collector network port"
echo " - simulators amount : Amount of xNF simulators to be launched"
- echo " - messages amount per simulator : Amount of messages to be sent from each xNF simulator to HV-VES"
- echo " - messages sending interval : interval in seconds between sending messages from xNFs"
+ echo " - messages batches amount per simulator : Amount of batches of messages to be sent from each xNF simulator to HV-VES"
+ echo " - messages sending interval : interval in seconds between sending batches of messages from xNFs"
echo "Optional parameters:"
+ echo " - ssl-disable : Should xNF simulator be configured without using SSL/TLS connections"
echo " - messages-in-batch : Amount of messages sent on each request"
echo " - docker-network : Docker network to which xNF simulators should be added"
echo " - xnf-logs-directory : Path to directory where logs from all xNF simulators should be stored"
echo "Example invocations:"
echo "./start-simulation.sh --messages-in-batch=5 --docker-network=development_default ves-hv-collector 6061 10 20 0.5"
echo "./start-simulation.sh --messages-in-batch=5 --xnf-logs-directory=/tmp/xnf-simulation localhost 6061 10 20 0.5"
+ echo "Invocation with remote HV-VES host (Kubernetes slave IP given with default K8S NodePort for HV-VES service):"
+ echo "./start-simulation.sh --ssl-disable --xnf-logs-directory=/tmp/xnf-simulation 10.183.36.78 30222 5 100 5"
exit 1
}
}
function create_xNFs_simulators() {
+ echo "Creating ${XNFS_AMOUNT} xNFs simulators"
+ [ -n "${SSL_DISABLE+x}" ] && verbose_log "--ssl-disable flag will be set inside containers."
for i in $(seq 1 ${XNFS_AMOUNT}); do
local XNF_PORT=$(get_unoccupied_port 32000 65000)
verbose_log "Starting xNF simulator container on port ${XNF_PORT} using run-xnf-simulator script"
- XNF_CONTAINER_ID=$(${DEVELOPMENT_BIN_DIRECTORY}/run-xnf-simulator.sh $XNF_PORT ${DOCKER_NETWORK:-})
+ XNF_CONTAINER_ID=$(${DEVELOPMENT_BIN_DIRECTORY}/run-xnf-simulator.sh ${SSL_DISABLE} $XNF_PORT ${HV_VES_HOSTNAME} ${HV_VES_PORT} ${DOCKER_NETWORK:-})
CREATED_XNF_SIMULATORS_PORTS+=(${XNF_PORT})
verbose_log "Container id: ${XNF_CONTAINER_ID}"
CREATED_XNF_SIMULATORS_IDS+=(${XNF_CONTAINER_ID})
}
function wait_for_containers_startup_or_fail() {
- local seconds_to_wait=10
+ local intervals_amount=30
+ local wait_interval=5
local all_containers_healthy=1
- verbose_log "Waiting ${seconds_to_wait}s for containers startup"
+ verbose_log "Waiting up to ${intervals_amount} times with interval of ${wait_interval}s for containers startup"
set +e
- for i in $(seq 1 ${seconds_to_wait}); do
+ for i in $(seq 1 ${intervals_amount}); do
verbose_log "Try no. ${i}"
all_containers_healthy=1
- for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do
- verbose_log "Checking container on port ${port}"
- local status_code=$(curl -s -o /dev/null -I -w "%{http_code}" localhost:${port}/healthcheck)
- if [ $status_code -ne 200 ]; then
- verbose_log "Container on port ${port} is unhealthy "
+ for id in ${CREATED_XNF_SIMULATORS_IDS[@]}; do
+ verbose_log "Checking container with id ${id}"
+ health=$(docker inspect --format='{{json .State.Health.Status}}' ${id})
+ if [ ${health} != "\"healthy\"" ]; then
+ verbose_log "Container ${id} is not in healthy state. Actual status: ${health}"
all_containers_healthy=0
break
fi
if [ $all_containers_healthy -eq 1 ]; then
break
fi
- sleep 1
+ verbose_log "Sleeping for ${wait_interval}s"
+ sleep $wait_interval
done
set -e
}
function start_simulation() {
- verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves
- ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}"
+ verbose_log "Simulation: every xNF will send ${MESSAGES_IN_BATCH} messages to hv-ves ( running on
+ ${HV_VES_HOSTNAME}:${HV_VES_PORT} ) ${MESSAGE_BATCHES_AMOUNT} times, once every ${MESSAGES_SENDING_INTERVAL}s"
for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do
start_single_simulation $port $MESSAGES_IN_BATCH &
done
for i in $(seq 1 ${seconds_to_wait}); do
verbose_log "Wait no. ${i}"
all_containers_finished=1
- for port in ${CREATED_XNF_SIMULATORS_PORTS[@]}; do
- local container_status=$(curl --request GET -s localhost:${port}/healthcheck | jq -r '.["Detailed status"]')
+ for id in ${CREATED_XNF_SIMULATORS_IDS[@]}; do
+ verbose_log "Checking container ${id}"
+ local container_status=$(docker inspect --format='{{json .State.Health.Log }}' ${id} | jq '.[-1] | .Output')
- verbose_log "Container on port ${port} status: ${container_status}"
- if [ "${container_status}" = "Busy" ]; then
+ verbose_log "Container ${id} status: ${container_status}"
+ if [ "${container_status}" != "\"UP\\nNo simulation is in progress at the moment\"" ]; then
all_containers_finished=0
break
fi
echo "All containers finished sending messages"
break
fi
+ verbose_log "Sleeping for 1s"
sleep 1
done
+
+
+ if [ $all_containers_finished -ne 1 ]; then
+ echo "[ERROR] Some xNFs simulators failed to finish sending messages - simulation probably failed"
+ echo "For debug output rerun simulation with -v and --xnf-logs-directory command line options"
+ cleanup
+ echo "Exitting..."
+ exit 3
+ fi
}
function cleanup() {
if [ -n "${XNF_LOGS_DIRECTORY+x}" ]; then
local log_file=${XNF_LOGS_DIRECTORY}/${container_id}.log
verbose_log "Writing container logs to: ${log_file}"
- docker logs ${container_id} > $log_file
+ docker logs ${container_id} &> $log_file
fi
verbose_log "Removing container: ${container_id}"
docker rm $container_id > /dev/null
case "${OPTARG}" in
verbose)
VERBOSE=True ;;
+ ssl-disable)
+ SSL_DISABLE="--ssl-disable" ;;
help)
usage ;;
*)
MESSAGES_SENDING_INTERVAL=${5}
# set defaults if absent
-[ -z "${MESSAGES_IN_BATCH}" ] && MESSAGES_IN_BATCH=1
+[ -z "${MESSAGES_IN_BATCH+x}" ] && MESSAGES_IN_BATCH=1
+[ -z "${SSL_DISABLE+x}" ] && SSL_DISABLE=""
create_logs_dir
CREATED_XNF_SIMULATORS_PORTS=()
CREATED_XNF_SIMULATORS_IDS=()
-echo "Creating ${XNFS_AMOUNT} xNFs simulators"
trap cleanup SIGINT SIGTERM
create_xNFs_simulators
assumed_message_sending_time=$(echo ";0.00025 * $XNFS_AMOUNT" | bc)
seconds_to_wait=$(echo ";$assumed_message_sending_time * $MESSAGE_BATCHES_AMOUNT * $MESSAGES_IN_BATCH" | bc)
+seconds_to_wait=$(echo ";if($seconds_to_wait > 2) $seconds_to_wait else 2" | bc)
wait_for_simulators_to_finish_sending_messages $seconds_to_wait
# there might be network lag between moment when xNF finished sending messages and they actually are received by hv-ves
# thus we cannot start removing xNFs immediately to prevent closing socket channels
"--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
"--kafka-bootstrap-servers", "message-router-kafka:9092",
"--key-store-password", "onaponap",
- "--trust-store-password", "onaponap"]
+ "--trust-store-password", "onaponap",
+ "--first-request-delay", "2",
+ "--log-level", "DEBUG"]
environment:
JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid"
healthcheck:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
ports:
- "6062:6062/tcp"
+ - "6063:6063"
command: ["--listen-port", "6062",
+ "--health-check-api-port", "6063",
"--ves-host", "ves-hv-collector",
"--ves-port", "6061",
"--key-store", "/etc/ves-hv/client.p12",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
+ healthcheck:
+ test: curl -f http://localhost:6063/health/ready || exit 1
+ interval: 10s
+ timeout: 3s
+ retries: 3
+ start_period: 10s
depends_on:
- ves-hv-collector
volumes:
dcae-app-simulator:
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
ports:
- - "6063:6063/tcp"
- command: ["--listen-port", "6063",
+ - "6064:6064/tcp"
+ command: ["--listen-port", "6064",
"--kafka-bootstrap-servers", "message-router-kafka:9092",
"--kafka-topics", "HV_VES_PERF3GPP"]
depends_on:
}
.doOnError {
logger.error { "Failed to acquire configuration from consul" }
- healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
.subscribe(config::set)
private val lastConfigurationHash: AtomicReference<ByteArray> = AtomicReference(byteArrayOf())
private val retry = retrySpec.doOnRetry {
logger.withWarn(ServiceContext::mdc) { log("Could not load fresh configuration", it.exception()) }
- healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
constructor(http: HttpAdapter,
import reactor.retry.Retry
import reactor.test.StepVerifier
import java.time.Duration
-import java.util.*
-import kotlin.test.assertEquals
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
it("should update the health state") {
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
- .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
.verifyComplete()
}
}
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
.describedAs("application health state")
- .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ .isEqualTo(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
}
}
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
+import java.net.InetSocketAddress
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
}
- fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+ fun start(socketAddress: InetSocketAddress, kafkaTopics: Set<String>): IO<RatpackServer> =
simulator.listenToTopics(kafkaTopics).map {
RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
- .handlers(::setupHandlers)
+ server.serverConfig(
+ ServerConfig.embedded()
+ .port(socketAddress.port)
+ ).handlers(::setupHandlers)
}
}
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
.bind()
DcaeAppSimConfiguration(
- listenPort,
+ InetSocketAddress(listenPort),
maxPayloadSizeBytes,
kafkaBootstrapServers,
kafkaTopics)
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
+import java.net.InetSocketAddress
+
data class DcaeAppSimConfiguration(
- val apiPort: Int,
+ val apiAddress: InetSocketAddress,
val maxPayloadSizeBytes: Int,
val kafkaBootstrapServers: String,
val kafkaTopics: Set<String>
val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
- .start(config.apiPort, config.kafkaTopics)
+ .start(config.apiAddress, config.kafkaTopics)
.unit()
}
}
it("should set proper port") {
- assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+ assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt())
}
}
it("should set proper port") {
- assertThat(result.apiPort).isEqualTo(listenPort.toInt())
+ assertThat(result.apiAddress.port).isEqualTo(listenPort.toInt())
}
it("should set proper kafka bootstrap servers") {
* @since August 2018
*/
enum class HealthDescription(val message: String, val status: HealthStatus) {
+ STARTING("Component is starting", HealthStatus.OUT_OF_SERVICE),
HEALTHY("Healthy", HealthStatus.UP),
- STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE),
- RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
- CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN)
+ BUSY("Processing at least one request", HealthStatus.UP),
+ IDLE("No simulation is in progress at the moment", HealthStatus.UP),
+ /* Configuration related */
+ RETRYING_FOR_DYNAMIC_CONFIGURATION("Dynamic configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
+ DYNAMIC_CONFIGURATION_NOT_FOUND("Dynamic configuration not found", HealthStatus.DOWN)
}
private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
healthDescription.get().run {
- logger.debug { "HV-VES status: $status, $message" }
+ logger.debug { "Component status: $status, $message" }
resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
}
val healthStateProviderImpl = HealthStateImpl()
on("health state update") {
healthStateProviderImpl.changeState(HealthDescription.HEALTHY)
- healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
it("should push new health state to the subscriber") {
StepVerifier
.create(healthStateProviderImpl().take(4))
.expectNext(HealthDescription.HEALTHY)
- .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
+ .expectNext(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
.verifyComplete()
}
}
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
private val logger = Logger("$VESHV_PACKAGE.main")
logger.withError { log("Failed to start a server", ex) }
ExitFailure(1)
},
- { logger.info { "Gentle shutdown" } }
+ { logger.info { "Finished" } }
)
private fun startAndAwaitServers(config: ServerConfiguration) =
Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
logger.info { "Using configuration: $config" }
HealthCheckServer.start(config).bind()
- VesServer.start(config).bind()
- .await().bind()
+ VesServer.start(config).bind().run {
+ registerShutdownHook(shutdown()).bind()
+ await().bind()
+ }
}.fix()
-
set -euo pipefail
-java ${JAVA_OPTS:-''} -cp '*:' org.onap.dcae.collectors.veshv.main.MainKt $@
+pid=-1
+
+function handle_sigterm() {
+ if [[ ${pid} -ge 0 ]]; then
+ echo "Caught SIGTERM signal. Redirecting to process with pid=${pid}"
+ kill -TERM "${pid}"
+ wait ${pid}
+ fi
+ exit 143 # 128 + 15 -- SIGTERM
+}
+trap "handle_sigterm" SIGTERM
+
+java ${JAVA_OPTS:-} -cp '*:' org.onap.dcae.collectors.veshv.main.MainKt $@ &
+pid=$!
+echo "Service started with pid=${pid}"
+wait ${pid}
data class ExitFailure(override val code: Int) : ExitCode()
-fun Either<IO<Unit>, IO<Unit>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
+fun <A, B> Either<IO<A>, IO<B>>.unsafeRunEitherSync(onError: (Throwable) -> ExitCode, onSuccess: () -> Unit) =
flatten().attempt().unsafeRunSync().fold({ onError(it).io().unsafeRunSync() }, { onSuccess() })
-
fun IO<Any>.unit() = map { Unit }
fun <T> Mono<T>.asIo() = IO.async<T> { callback ->
logger.trace(context, it)
Mono.just<T>(t)
})
- }
\ No newline at end of file
+ }
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.netty.DisposableServer
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
*/
class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
override fun shutdown() = IO {
- ctx.disposeNow()
+ logger.info { "Graceful shutdown" }
+ ctx.disposeNow(SHUTDOWN_TIMEOUT)
+ logger.info { "Server disposed" }
}
override fun await() = IO<Unit> {
ctx.channel().closeFuture().sync()
}
+
+ companion object {
+ val logger = Logger(NettyServerHandle::class)
+ private val SHUTDOWN_TIMEOUT = Duration.ofSeconds(10)
+ }
}
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+package org.onap.dcae.collectors.veshv.utils
-// TODO: probably should be merged with HealthDescription or made similiar to it
-internal object XnfStatus {
+import arrow.effects.IO
- const val BUSY = "Busy"
- const val IDLE = "Idle"
- const val DETAILED_STATUS_NODE = "Detailed status"
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since January 2019
+ */
+
+fun registerShutdownHook(job: () -> Unit) {
+ Runtime.getRuntime().addShutdownHook(object : Thread() {
+ override fun run() {
+ job()
+ }
+ })
+}
+
+fun registerShutdownHook(job: IO<Unit>) = IO {
+ registerShutdownHook {
+ job.unsafeRunSync()
+ }
}
<artifactId>hv-collector-ves-message-generator</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-health-check</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-test-utils</artifactId>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-effects</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects-instances</artifactId>
+ </dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-core</artifactId>
import reactor.core.publisher.ReplayProcessor
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpClient
+import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
class VesHvClient(private val configuration: SimulatorConfiguration) {
private val client: TcpClient = TcpClient.create()
- .host(configuration.vesHost)
- .port(configuration.vesPort)
+ .addressSupplier { configuration.hvVesAddress }
.configureSsl()
private fun TcpClient.configureSsl() =
.handle { _, output -> handler(complete, messages, output) }
.connect()
.doOnError {
- logger.info {
- "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
- }
+ logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
}
.subscribe {
- logger.info {
- "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
- }
+ logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
}
return complete.then()
}
val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
- .window(MAX_BATCH_SIZE)
+ .window(XS_BUFFER_SIZE)
return nettyOutbound
.logConnectionClosed()
private fun NettyOutbound.logConnectionClosed() =
withConnection { conn ->
- conn.onTerminate().subscribe {
+ conn.onDispose {
logger.info { "Connection to ${conn.address()} has been closed" }
}
}
companion object {
private val logger = Logger(VesHvClient::class)
- private const val MAX_BATCH_SIZE = 128
}
}
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.BUSY
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.DETAILED_STATUS_NODE
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.IDLE
-import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
import ratpack.http.TypedData
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
+import java.net.InetSocketAddress
import java.util.*
-import javax.json.Json
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
private val xnfSimulator: XnfSimulator,
private val ongoingSimulations: OngoingSimulations) {
- fun start(port: Int): IO<RatpackServer> = IO {
+ fun start(socketAddress: InetSocketAddress): IO<RatpackServer> = IO {
RatpackServer.start { server ->
- server.serverConfig(ServerConfig.embedded().port(port))
+ server.serverConfig(ServerConfig.embedded()
+ .port(socketAddress.port))
.handlers(this::configureHandlers)
}
}
.post("simulator", ::startSimulationHandler)
.post("simulator/async", ::startSimulationHandler)
.get("simulator/:id", ::simulatorStatusHandler)
- .get("healthcheck", ::healthcheckHandler)
}
private fun startSimulationHandler(ctx: Context) {
ctx.response.sendAndHandleErrors(IO.just(response))
}
- private fun healthcheckHandler(ctx: Context) {
- val healthCheckDetailedMessage = createHealthCheckDetailedMessage()
- val simulatorStatus = HttpConstants.STATUS_OK
- logger.info { "Returning simulator status: ${simulatorStatus} ${healthCheckDetailedMessage}" }
- ctx.response.status(simulatorStatus).send(healthCheckDetailedMessage)
- }
-
- private fun createHealthCheckDetailedMessage() =
- Json.createObjectBuilder()
- .add(DETAILED_STATUS_NODE, when {
- ongoingSimulations.isAnySimulationPending() -> BUSY
- else -> IDLE
- })
- .build().toString()
-
companion object {
private val logger = Logger(XnfApiServer::class)
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+
+
+internal class XnfHealthCheckServer {
+ fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config)
+ .start()
+ .map { logger.info(serverStartedMessage(it)); it }
+
+ private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer {
+ val monitoring = object : PrometheusMetricsProvider {
+ override fun lastStatus(): Mono<String> = Mono.just("not implemented")
+ }
+ return HealthCheckApiServer(
+ HealthState.INSTANCE,
+ monitoring,
+ config.healthCheckApiListenAddress)
+ }
+
+ private fun serverStartedMessage(handle: ServerHandle) =
+ { "Health check server is up and listening on ${handle.host}:${handle.port}" }
+
+ companion object {
+ private val logger = Logger(XnfHealthCheckServer::class)
+ }
+}
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
VES_HV_PORT,
VES_HV_HOST,
LISTEN_PORT,
+ HEALTH_CHECK_API_PORT,
MAXIMUM_PAYLOAD_SIZE_BYTES,
SSL_DISABLE,
KEY_STORE_FILE,
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+ val healthCheckApiListenAddress = cmdLine.intValue(HEALTH_CHECK_API_PORT,
+ DefaultValues.HEALTH_CHECK_API_PORT)
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
SimulatorConfiguration(
- listenPort,
- vesHost,
- vesPort,
+ InetSocketAddress(listenPort),
+ InetSocketAddress(healthCheckApiListenAddress),
+ InetSocketAddress(vesHost, vesPort),
maxPayloadSizeBytes,
createSecurityConfiguration(cmdLine).bind())
}.fix()
+
+ internal object DefaultValues {
+ const val HEALTH_CHECK_API_PORT = 6063
+ }
}
package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
data class SimulatorConfiguration(
- val listenPort: Int,
- val vesHost: String,
- val vesPort: Int,
+ val listenAddress: InetSocketAddress,
+ val healthCheckApiListenAddress: InetSocketAddress,
+ val hvVesAddress: InetSocketAddress,
val maxPayloadSizeBytes: Int,
val security: SecurityConfiguration)
import arrow.effects.IO
import kotlinx.coroutines.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(),
+ private val healthState: HealthState = HealthState.INSTANCE) {
private val asyncSimulationContext = executor.asCoroutineDispatcher()
private val simulations = ConcurrentHashMap<UUID, Status>()
fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
val id = UUID.randomUUID()
simulations[id] = StatusOngoing
+ updateHealthState()
simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
result.fold(
logger.info { "Finished sending messages" }
simulations[id] = StatusSuccess
}
- )
+ ).also { updateHealthState() }
}
return id
}
- fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+ private fun updateHealthState() = healthState.changeState(currentState())
+
+ private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE
- fun isAnySimulationPending() = simulations.any {
+ internal fun isAnySimulationPending() = simulations.any {
status(it.key) is StatusOngoing
}
- internal fun clear() {
- simulations.clear()
- }
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() = simulations.clear()
companion object {
private val logger = Logger(XnfApiServer::class)
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.instances.io.monad.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
-import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import ratpack.server.RatpackServer
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
private val logger = Logger(PACKAGE_NAME)
*/
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map { config ->
- logger.info { "Using configuration: $config" }
- val xnfSimulator = XnfSimulator(
- VesHvClient(config),
- MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
- XnfApiServer(xnfSimulator, OngoingSimulations())
- .start(config.listenPort)
- .unit()
- }
+ .map(::startServers)
.unsafeRunEitherSync(
{ ex ->
logger.withError { log("Failed to start a server", ex) }
},
{
logger.info { "Started xNF Simulator API server" }
+ HealthState.INSTANCE.changeState(HealthDescription.IDLE)
}
)
+
+private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
+ IO.monad().binding {
+ logger.info { "Using configuration: $config" }
+ XnfHealthCheckServer().startServer(config).bind()
+ val xnfSimulator = XnfSimulator(
+ VesHvClient(config),
+ MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ XnfApiServer(xnfSimulator, OngoingSimulations())
+ .start(config.listenAddress)
+ .bind()
+ }.fix()