Merge "Log details about header validation failure"
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>
Thu, 13 Dec 2018 13:20:51 +0000 (13:20 +0000)
committerGerrit Code Review <gerrit@onap.org>
Thu, 13 Dec 2018 13:20:51 +0000 (13:20 +0000)
59 files changed:
INFO.yaml
development/bin/consul.sh [new file with mode: 0755]
development/bin/dcae-msgs.sh [new file with mode: 0755]
development/bin/dcae-reset.sh [new file with mode: 0755]
development/bin/dcae-topic.sh [new file with mode: 0755]
development/bin/run-xnf-simulator.sh [new file with mode: 0755]
development/bin/xnf-simulation.sh [new file with mode: 0755]
development/docker-compose.yml [new file with mode: 0644]
development/ssl/.gitignore [moved from ssl/.gitignore with 100% similarity]
development/ssl/Makefile-openssl [moved from ssl/Makefile-openssl with 100% similarity]
development/ssl/README.md [moved from ssl/README.md with 100% similarity]
development/ssl/gen-certs.sh [moved from ssl/gen-certs.sh with 100% similarity]
docker-compose.yml [deleted file]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt [new file with mode: 0644]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt [new file with mode: 0644]
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
sources/hv-collector-core/src/test/resources/logback-test.xml
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
sources/hv-collector-ct/src/test/resources/logback-test.xml
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/DcaeAppApiServer.kt
sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
sources/hv-collector-domain/src/test/resources/logback.xml [deleted file]
sources/hv-collector-health-check/pom.xml
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
sources/hv-collector-main/src/main/resources/logback.xml
sources/hv-collector-ssl/src/test/resources/logback-test.xml
sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/assertions.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/ratpack.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt [new file with mode: 0644]
sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
sources/hv-collector-utils/src/test/resources/logback-test.xml
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
sources/hv-collector-xnf-simulator/src/main/resources/logback.xml

index 423d1ba..6176c0f 100644 (file)
--- a/INFO.yaml
+++ b/INFO.yaml
@@ -62,10 +62,23 @@ committers:
       company: 'ATT'
       id: 'jflucas'
       timezone: 'America/New_York'
-    - name: 'Przemyslaw Wasala '
-      email: 'przemyslaw.wasala@nokia.com'
+    - name: 'Piotr Jaszczyk'
+      email: 'piotr.jaszczyk@nokia.com'
       company: 'Nokia'
-      id: 'przemyslaw.wasala'
+      id: 'jaszczur'
+      timezone: 'Europe/Warsaw'
+    - name: 'Piotr Wielebski'
+      email: 'piotr.wielebski@nokia.com'
+      company: 'Nokia'
+      id: 'pwielebs'
       timezone: 'Europe/Warsaw'
 tsc:
     approval: 'https://lists.onap.org/pipermail/onap-tsc'
+    changes: 
+        - type: 'Removal'
+          name: 'Przemyslaw Wasala'
+          link: 'https://lists.onap.org/g/onap-tsc/message/4248'
+        - type: 'Addition'
+          name: 'Piotr Jaszczyk'
+          name: 'Piotr Wielebski'
+          link: 'https://lists.onap.org/g/onap-tsc/message/4259'
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
new file mode 100755 (executable)
index 0000000..c229f83
--- /dev/null
@@ -0,0 +1,79 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+usage() {
+    echo "Put HV-VES configuration into Consul key-value store"
+    echo "Usage: $0 [-h|--help] [-v|--verbose] [domain [topic]]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+DOMAIN=${1:-perf3gpp}
+TOPIC=${2:-HV_VES_PERF3GPP}
+
+CONFIGURATION="
+{
+    \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\",
+    \"collector.routing\":
+        [{
+            \"fromDomain\": \"${DOMAIN}\",
+            \"toTopic\": \"${TOPIC}\"
+        }]
+}"
+CONFIGURATION_ENDPOINT=localhost:8500/v1/kv/veshv-config
+
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "Configuration: ${CONFIGURATION}"
+    echo "Putting configuration under ${CONFIGURATION_ENDPOINT}."
+fi
+curl --request PUT ${CONFIGURATION_ENDPOINT} -d "${CONFIGURATION}"
+echo
diff --git a/development/bin/dcae-msgs.sh b/development/bin/dcae-msgs.sh
new file mode 100755 (executable)
index 0000000..cb05a8c
--- /dev/null
@@ -0,0 +1,64 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+usage() {
+    echo "Return current amount of consumed messages by dcae-app-simulator"
+    echo "Usage: $0 [-h|--help] [-v|--verbose]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "All messages count currently consumed by dcae app simulator: "
+fi
+
+curl --request GET localhost:6063/messages/all/count
+echo
diff --git a/development/bin/dcae-reset.sh b/development/bin/dcae-reset.sh
new file mode 100755 (executable)
index 0000000..e5b7b05
--- /dev/null
@@ -0,0 +1,65 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+
+usage() {
+    echo "Resets dcae-app-simulator consumed messages count"
+    echo "Usage: $0 [-h|--help] [-v|--verbose]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "Requesting DCAE app running on port 6063 to reset messages count"
+fi
+
+curl --request DELETE localhost:6063/messages
+echo
diff --git a/development/bin/dcae-topic.sh b/development/bin/dcae-topic.sh
new file mode 100755 (executable)
index 0000000..8c17622
--- /dev/null
@@ -0,0 +1,66 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+usage() {
+    echo "Set dcae-app-simulator to start consuming messages from given topic (HV_VES_PERF3GPP by default)"
+    echo "Usage: $0 [-h|--help] [-v|--verbose] [topic]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+TOPIC=${1:-HV_VES_PERF3GPP}
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "Requesting DCAE app running on port 6063 to consume messages from topic: ${TOPIC}"
+fi
+
+curl --request PUT localhost:6063/configuration/topics -d ${TOPIC}
+echo
\ No newline at end of file
diff --git a/development/bin/run-xnf-simulator.sh b/development/bin/run-xnf-simulator.sh
new file mode 100755 (executable)
index 0000000..3fe9692
--- /dev/null
@@ -0,0 +1,81 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+usage() {
+    echo "Start xnf-simulator container on given port and inside of given docker-network (by default 'development_default')"
+    echo "Usage: $0 [-h|--help] [-v|--verbose] <xnf listen port> [<hv ves docker network>]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+[ $# -eq 0 ] && usage
+
+
+LISTEN_PORT=$1
+HV_VES_NETWORK=${2:-development_default}
+
+PORTS="${LISTEN_PORT}:${LISTEN_PORT}/tcp"
+HV_VES_REPO_HOME=`pwd`/..
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "Starting xnf-simulator with ports configuration: ${PORTS} on network: ${HV_VES_NETWORK}"
+    echo "Container id:"
+fi
+docker run -d \
+           -v ${HV_VES_REPO_HOME}/ssl/:/etc/ves-hv/ \
+           -p ${PORTS} \
+           --network ${HV_VES_NETWORK} \
+           onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator \
+                    --listen-port ${LISTEN_PORT} \
+                    --ves-host ves-hv-collector \
+                    --ves-port 6061 \
+                    --key-store-password onaponap \
+                    --trust-store-password onaponap
\ No newline at end of file
diff --git a/development/bin/xnf-simulation.sh b/development/bin/xnf-simulation.sh
new file mode 100755 (executable)
index 0000000..e1d65aa
--- /dev/null
@@ -0,0 +1,103 @@
+#!/usr/bin/env bash
+# ============LICENSE_START=======================================================
+# dcaegen2-collectors-veshv
+# ================================================================================
+# Copyright (C) 2018 NOKIA
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+set -euo pipefail
+
+usage() {
+    echo "Send request to xnf-simulator"
+    echo "Usage: $0 [-h|--help] [-v|--verbose] [<xnf listen port> [<messages amount> [<messages type> [<xnf endpoint>]]]]"
+    exit 1
+}
+
+optspec=":vh-:" # catch v, h and -
+while getopts "$optspec" arg; do
+    case "${arg}" in
+        -) # handle longopts
+            case "${OPTARG}" in
+                verbose)
+                    VERBOSE=True
+                    ;;
+                help)
+                    usage
+                    ;;
+                *)
+                    echo "Unknown option --${OPTARG}" >&2
+                    usage
+                    ;;
+             esac
+             ;;
+        v)
+            VERBOSE=True
+            ;;
+        h)
+            usage
+            ;;
+        *)
+            echo "Unknown option -${OPTARG}" >&2
+            usage
+            ;;
+    esac
+done
+shift $((OPTIND-1))
+
+XNF_PORT=${1:-6062}
+MESSAGES_AMOUNT=${2:-1}
+MESSAGES_TYPE=${3:-VALID}
+XNF_ENDPOINT=simulator/async
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo "Requesting xnf-simulator on port ${XNF_PORT} to send ${MESSAGES_AMOUNT} messages of type ${MESSAGES_TYPE}"
+fi
+
+REQUEST_ID=$(curl --request POST -s localhost:${XNF_PORT}/${XNF_ENDPOINT} -d "
+[
+  {
+    \"commonEventHeader\": {
+      \"version\": \"sample-version\",
+      \"domain\": \"perf3gpp\",
+      \"sequence\": 1,
+      \"priority\": 1,
+      \"eventId\": \"sample-event-id\",
+      \"eventName\": \"sample-event-name\",
+      \"eventType\": \"sample-event-type\",
+      \"startEpochMicrosec\": 120034455,
+      \"lastEpochMicrosec\": 120034455,
+      \"nfNamingCode\": \"sample-nf-naming-code\",
+      \"nfcNamingCode\": \"sample-nfc-naming-code\",
+      \"reportingEntityId\": \"sample-reporting-entity-id\",
+      \"reportingEntityName\": \"sample-reporting-entity-name\",
+      \"sourceId\": \"sample-source-id\",
+      \"sourceName\": \"sample-source-name\",
+      \"vesEventListenerVersion\": \"7.2.0\"
+    },
+    \"messageType\": \"${MESSAGES_TYPE}\",
+    \"messagesAmount\": ${MESSAGES_AMOUNT}
+  }
+]")
+
+if [ -n "${VERBOSE+x}" ]; then
+    echo -e "Request id: ${REQUEST_ID}\n"
+
+    echo "To check request status execute:"
+    echo "curl --request GET localhost:${XNF_PORT}/simulator/${REQUEST_ID}"
+    echo "To further debug you can try something similiar to:"
+    echo "docker ps -a | grep ${XNF_PORT} | awk '{ print \$1 }' | xargs docker logs"
+else
+    echo "${REQUEST_ID}"
+fi
\ No newline at end of file
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
new file mode 100644 (file)
index 0000000..d4c3f1d
--- /dev/null
@@ -0,0 +1,112 @@
+version: "3.5"
+services:
+
+  #
+  # DMaaP Message Router
+  #
+
+  message-router-zookeeper:
+    image: wurstmeister/zookeeper
+    ports:
+    - "2181:2181"
+
+  message-router-kafka:
+#    image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
+    image: wurstmeister/kafka
+    ports:
+    - "9092:9092"
+    environment:
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
+      KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
+      KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092"
+      KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
+      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
+    volumes:
+    - /var/run/docker.sock:/var/run/docker.sock
+    depends_on:
+    - message-router-zookeeper
+
+
+  #
+  # Consul / CBS
+  #
+
+  consul-server:
+    image: docker.io/consul:1.0.6
+    ports:
+    - "8500:8500"
+    command: ["agent","-bootstrap", "-client=0.0.0.0", "-server", "-ui"]
+
+  consul-config:
+    image: consul
+    depends_on:
+    - consul-server
+    restart: on-failure
+    command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
+                                              "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
+                                              "collector.routing": [
+                                                {
+                                                  "fromDomain": "perf3gpp",
+                                                  "toTopic": "HV_VES_PERF3GPP"
+                                                }
+                                              ]
+                                            }']
+
+
+  #
+  # DCAE HV VES Collector
+  #
+
+  ves-hv-collector:
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
+    ports:
+    - "6060:6060"
+    - "6061:6061/tcp"
+    entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
+                 "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
+    command: ["--listen-port", "6061",
+              "--health-check-api-port", "6060",
+              "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
+              "--key-store-password", "onaponap",
+              "--trust-store-password", "onaponap"]
+    healthcheck:
+      test: curl -f http://localhost:6060/health/ready || exit 1
+      interval: 10s
+      timeout: 3s
+      retries: 3
+      start_period: 20s
+    depends_on:
+    - message-router-kafka
+    - consul-server
+    volumes:
+    - ./ssl/:/etc/ves-hv/
+
+
+  #
+  # Simulators
+  #
+
+  xnf-simulator:
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
+    ports:
+    - "6062:6062/tcp"
+    command: ["--listen-port", "6062",
+              "--ves-host", "ves-hv-collector",
+              "--ves-port", "6061",
+              "--key-store-password", "onaponap",
+              "--trust-store-password", "onaponap"]
+    depends_on:
+    - ves-hv-collector
+    volumes:
+      - ./ssl/:/etc/ves-hv/
+
+  dcae-app-simulator:
+    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
+    ports:
+    - "6063:6063/tcp"
+    command: ["--listen-port", "6063",
+              "--kafka-bootstrap-servers", "message-router-kafka:9092",
+              "--kafka-topics", "HV_VES_PERF3GPP"]
+    depends_on:
+    - message-router-kafka
similarity index 100%
rename from ssl/.gitignore
rename to development/ssl/.gitignore
similarity index 100%
rename from ssl/README.md
rename to development/ssl/README.md
diff --git a/docker-compose.yml b/docker-compose.yml
deleted file mode 100644 (file)
index 4015b08..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-version: "3.5"
-services:
-  zookeeper:
-    image: wurstmeister/zookeeper
-    ports:
-      - "2181:2181"
-
-  kafka:
-    image: wurstmeister/kafka
-    ports:
-      - "9092:9092"
-    environment:
-      KAFKA_ADVERTISED_HOST_NAME: "kafka"
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
-      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
-    volumes:
-      - /var/run/docker.sock:/var/run/docker.sock
-    depends_on:
-      - zookeeper
-
-  consul:
-      image: progrium/consul
-      ports:
-        - "8500:8500"
-      environment:
-        - CONSUL_BIND_INTERFACE=eth0
-      command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
-
-  ves-hv-collector:
-    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-main:latest
-#    build:
-#      context: hv-collector-main
-#      dockerfile: Dockerfile
-    ports:
-      - "6060:6060"
-      - "6061:6061/tcp"
-    entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
-                 "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
-    command: ["--listen-port", "6061",
-              "--health-check-api-port", "6060",
-              "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true",
-              "--key-store-password", "onaponap",
-              "--trust-store-password", "onaponap"]
-    healthcheck:
-      test: curl -f http://localhost:6060/health/ready || exit 1
-      interval: 10s
-      timeout: 3s
-      retries: 3
-      start_period: 20s
-    depends_on:
-      - kafka
-      - consul
-    volumes:
-      - ./ssl/:/etc/ves-hv/
-
-  xnf-simulator:
-    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-xnf-simulator
-#    build:
-#      context: hv-collector-xnf-simulator
-#      dockerfile: Dockerfile
-    ports:
-      - "6062:6062/tcp"
-    command: ["--listen-port", "6062",
-              "--ves-host", "ves-hv-collector",
-              "--ves-port", "6061",
-              "--key-store-password", "onaponap",
-              "--trust-store-password", "onaponap"]
-    depends_on:
-      - ves-hv-collector
-    volumes:
-      - ./ssl/:/etc/ves-hv/
-
-  dcae-app-simulator:
-    image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-dcae-app-simulator
-#    build:
-#      context: hv-collector-dcae-app-simulator
-#      dockerfile: Dockerfile
-    ports:
-      - "6063:6063/tcp"
-    command: ["--listen-port", "6063",
-              "--kafka-bootstrap-servers", "kafka:9092",
-              "--kafka-topics", "HV_VES_PERF3GPP"]
-    depends_on:
-      - kafka
index dd0111b..b686b25 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import reactor.core.publisher.Flux
@@ -35,12 +36,12 @@ interface Metrics {
 
 @FunctionalInterface
 interface SinkProvider {
-    operator fun invoke(config: CollectorConfiguration): Sink
+    operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
 
     companion object {
         fun just(sink: Sink): SinkProvider =
                 object : SinkProvider {
-                    override fun invoke(config: CollectorConfiguration): Sink = sink
+                    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
                 }
     }
 }
index 3c85a9b..5584d61 100644 (file)
@@ -23,15 +23,17 @@ import arrow.core.Option
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import java.util.*
 
 interface Collector {
-    fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
+    fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
 }
 
-typealias CollectorProvider = () -> Option<Collector>
+typealias CollectorProvider = (ClientContext) -> Option<Collector>
 
 interface Server {
     fun start(): IO<ServerHandle>
index 5c96e1c..2008fc3 100644 (file)
@@ -25,12 +25,13 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -47,31 +48,29 @@ class CollectorFactory(val configuration: ConfigurationProvider,
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val collector: AtomicReference<Collector> = AtomicReference()
+        val config: AtomicReference<CollectorConfiguration> = AtomicReference()
         configuration()
-                .map(this::createVesHvCollector)
                 .doOnNext {
-                    logger.info("Using updated configuration for new connections")
+                    logger.info { "Using updated configuration for new connections" }
                     healthState.changeState(HealthDescription.HEALTHY)
                 }
                 .doOnError {
-                    logger.error("Failed to acquire configuration from consul")
+                    logger.error { "Failed to acquire configuration from consul" }
                     healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
-                .subscribe(collector::set)
-        return collector::getOption
+                .subscribe(config::set)
+        return { ctx: ClientContext ->
+            config.getOption().map { config -> createVesHvCollector(config, ctx) }
+        }
     }
 
-    private fun createVesHvCollector(config: CollectorConfiguration): Collector {
-        return VesHvCollector(
-                wireChunkDecoderSupplier = { alloc ->
-                    WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
-                },
-                protobufDecoder = VesDecoder(),
-                router = Router(config.routing),
-                sink = sinkProvider(config),
-                metrics = metrics)
-    }
+    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
+            clientContext = ctx,
+            wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+            protobufDecoder = VesDecoder(),
+            router = Router(config.routing, ctx),
+            sink = sinkProvider(config, ctx),
+            metrics = metrics)
 
     companion object {
         private val logger = Logger(CollectorFactory::class)
index cee658b..6105b58 100644 (file)
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.Routing
 import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
-class Router(private val routing: Routing) {
+class Router(private val routing: Routing, private val ctx: ClientContext) {
     fun findDestination(message: VesMessage): Option<RoutedMessage> =
-            routing.routeFor(message.header).map { it(message) }
+            routing.routeFor(message.header).map { it(message) }.also {
+                if (it.isEmpty()) {
+                    logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
+                }
+            }
+
+    companion object {
+        private val logger = Logger(Routing::class)
+    }
 }
index 4176de9..cf73aed 100644 (file)
@@ -21,19 +21,19 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Either
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
@@ -42,28 +42,27 @@ import reactor.core.publisher.Mono
  * @since May 2018
  */
 internal class VesHvCollector(
-        private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+        private val clientContext: ClientContext,
+        private val wireChunkDecoder: WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
         private val router: Router,
         private val sink: Sink,
         private val metrics: Metrics) : Collector {
 
-    override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
-            wireChunkDecoderSupplier(alloc).let { wireDecoder ->
-                dataStream
-                        .transform { decodeWireFrame(it, wireDecoder) }
-                        .transform(::filterInvalidWireFrame)
-                        .transform(::decodeProtobufPayload)
-                        .transform(::filterInvalidProtobufMessages)
-                        .transform(::routeMessage)
-                        .onErrorResume { logger.handleReactiveStreamError(it) }
-                        .doFinally { releaseBuffersMemory(wireDecoder) }
-                        .then()
-            }
+    override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+            dataStream
+                    .transform { decodeWireFrame(it) }
+                    .transform(::filterInvalidWireFrame)
+                    .transform(::decodeProtobufPayload)
+                    .transform(::filterInvalidProtobufMessages)
+                    .transform(::routeMessage)
+                    .onErrorResume { logger.handleReactiveStreamError(clientContext, it) }
+                    .doFinally { releaseBuffersMemory() }
+                    .then()
 
-    private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+    private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
             .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
-            .concatMap(decoder::decode)
+            .concatMap(wireChunkDecoder::decode)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
     private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
@@ -75,7 +74,7 @@ internal class VesHvCollector(
 
     private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
             .decode(rawPayload)
-            .filterFailedWithLog(logger,
+            .filterFailedWithLog(logger, clientContext::asMap,
                     { "Ves event header decoded successfully" },
                     { "Failed to decode ves event header, reason: ${it.message}" })
 
@@ -89,15 +88,15 @@ internal class VesHvCollector(
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
-            .filterEmptyWithLog(logger,
+            .filterEmptyWithLog(logger, clientContext::asMap,
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
 
-    private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-            .also { logger.debug("Released buffer memory after handling message stream") }
+    private fun releaseBuffersMemory() = wireChunkDecoder.release()
+            .also { logger.debug { "Released buffer memory after handling message stream" } }
 
     fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
-            filterFailedWithLog(logger, predicate)
+            filterFailedWithLog(logger, clientContext::asMap, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt
new file mode 100644 (file)
index 0000000..21b79bb
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import reactor.core.publisher.Flux
+
+@Suppress("TooManyFunctions")
+internal object ClientContextLogging {
+    fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::asMap, block)
+    fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::asMap, block)
+    fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::asMap, block)
+    fun Logger.withDebug(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withDebug(ctx::asMap, block)
+    fun Logger.withTrace(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withTrace(ctx::asMap, block)
+
+    fun Logger.error(ctx: ClientContext, message: () -> String) = error(ctx::asMap, message)
+    fun Logger.warn(ctx: ClientContext, message: () -> String) = warn(ctx::asMap, message)
+    fun Logger.info(ctx: ClientContext, message: () -> String) = info(ctx::asMap, message)
+    fun Logger.debug(ctx: ClientContext, message: () -> String) = debug(ctx::asMap, message)
+    fun Logger.trace(ctx: ClientContext, message: () -> String) = trace(ctx::asMap, message)
+
+    fun <T> Logger.handleReactiveStreamError(context: ClientContext, ex: Throwable,
+                                             returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+        return this.handleReactiveStreamError({ context.asMap() }, ex, returnFlux)
+    }
+}
+
index cea8a7e..bbaa47c 100644 (file)
@@ -52,7 +52,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
     private val retry = retrySpec
             .doOnRetry {
-                logger.warn("Could not get fresh configuration", it.exception())
+                logger.withWarn { log("Could not get fresh configuration", it.exception()) }
                 healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
             }
 
index bdce6f7..3fefc6e 100644 (file)
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.impl.adapters
 
 import io.netty.handler.codec.http.HttpStatusClass
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Mono
 import reactor.netty.http.client.HttpClient
@@ -30,8 +31,6 @@ import reactor.netty.http.client.HttpClient
  */
 open class HttpAdapter(private val httpClient: HttpClient) {
 
-    private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
     open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
             .get()
             .uri(url + createQueryString(queryParams))
@@ -44,8 +43,8 @@ open class HttpAdapter(private val httpClient: HttpClient) {
                 }
             }
             .doOnError {
-                logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
-                logger.debug("Nested exception:", it)
+                logger.error { "Failed to get resource on path: $url (${it.localizedMessage})" }
+                logger.withDebug { log("Nested exception:", it) }
             }
 
     private fun createQueryString(params: Map<String, Any>): String {
@@ -65,4 +64,9 @@ open class HttpAdapter(private val httpClient: HttpClient) {
         return builder.removeSuffix("&").toString()
     }
 
+    companion object {
+
+
+        private val logger = Logger(HttpAdapter::class)
+    }
 }
index 5f4bf35..ec8593a 100644 (file)
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.impl.adapters
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -33,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong
  */
 internal class LoggingSinkProvider : SinkProvider {
 
-    override fun invoke(config: CollectorConfiguration): Sink {
+    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
         return object : Sink {
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
@@ -47,9 +50,9 @@ internal class LoggingSinkProvider : SinkProvider {
                 val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
                 val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
                 if (msgs % INFO_LOGGING_FREQ == 0L)
-                    logger.info(logMessageSupplier)
+                    logger.info(ctx, logMessageSupplier)
                 else
-                    logger.trace(logMessageSupplier)
+                    logger.trace(ctx, logMessageSupplier)
             }
 
         }
index c4d6c87..690a7d1 100644 (file)
@@ -20,6 +20,9 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -35,7 +38,8 @@ import java.util.concurrent.atomic.AtomicLong
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+                         private val ctx: ClientContext) : Sink {
     private val sentMessages = AtomicLong(0)
 
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
@@ -45,17 +49,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
                     if (it.isSuccessful()) {
                         Mono.just(it)
                     } else {
-                        logger.warn(it.exception()) { "Failed to send message to Kafka" }
+                        logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
                         Mono.empty<SenderResult<RoutedMessage>>()
                     }
                 }
                 .map { it.correlationMetadata() }
 
-        return if (logger.traceEnabled) {
-            result.doOnNext(::logSentMessage)
-        } else {
-            result
-        }
+        return result.doOnNext(::logSentMessage)
     }
 
     private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -69,7 +69,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
     }
 
     private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace {
+        logger.trace(ctx::asMap, Marker.INVOKE) {
             val msgNum = sentMessages.incrementAndGet()
             "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
         }
index 1819195..b4f470d 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
@@ -33,8 +34,8 @@ import reactor.kafka.sender.SenderOptions
  * @since June 2018
  */
 internal class KafkaSinkProvider : SinkProvider {
-    override fun invoke(config: CollectorConfiguration): Sink {
-        return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+        return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
     }
 
     private fun constructSenderOptions(config: CollectorConfiguration) =
index 0b2997f..6f02d43 100644 (file)
@@ -23,6 +23,11 @@ import arrow.core.getOrElse
 import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
@@ -57,57 +62,64 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
             sslContextFactory
                     .createSslContext(serverConfig.securityConfiguration)
                     .map { sslContext ->
-                        logger.info("Collector configured with SSL enabled")
+                        logger.info { "Collector configured with SSL enabled" }
                         this.secure { b -> b.sslContext(sslContext) }
                     }.getOrElse {
-                        logger.info("Collector configured with SSL disabled")
+                        logger.info { "Collector configured with SSL disabled" }
                         this
                     }
 
-    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
-            collectorProvider().fold(
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
-                        }
-                        Mono.empty()
-                    },
-                    {
-                        nettyInbound.withConnection { conn ->
-                            logger.info { "Handling connection from ${conn.address()}" }
-                            conn.configureIdleTimeout(serverConfig.idleTimeout)
-                                    .logConnectionClosed()
-                        }
-                        it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+        val clientContext = ClientContext(nettyOutbound.alloc())
+        nettyInbound.withConnection {
+            clientContext.clientAddress = it.address()
+        }
+
+        logger.debug(clientContext::asMap, Marker.ENTRY) { "Client connection request received" }
+        return collectorProvider(clientContext).fold(
+                {
+                    logger.warn(clientContext::asMap) { "Collector not ready. Closing connection..." }
+                    Mono.empty()
+                },
+                {
+                    logger.info(clientContext::asMap) { "Handling new connection" }
+                    nettyInbound.withConnection { conn ->
+                        conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                                .logConnectionClosed(clientContext)
                     }
-            )
+                    it.handleConnection(createDataStream(nettyInbound))
+                }
+        )
+    }
 
     private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
 
-    private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+    private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
         onReadIdle(timeout.toMillis()) {
-            logger.info {
+            logger.info(ctx) {
                 "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
             }
-            disconnectClient()
+            disconnectClient(ctx)
         }
         return this
     }
 
-    private fun Connection.disconnectClient() {
+    private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
+            logger.debug(ctx::asMap, Marker.EXIT) { "Closing client channel." }
             if (it.isSuccess)
-                logger.debug { "Channel (${address()}) closed successfully." }
+                logger.debug(ctx) { "Channel closed successfully." }
             else
-                logger.warn("Channel close failed", it.cause())
+                logger.withWarn(ctx) { log("Channel close failed", it.cause()) }
         }
     }
 
-    private fun Connection.logConnectionClosed(): Connection {
+    private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
         onTerminate().subscribe {
-            logger.info("Connection from ${address()} has been closed")
+            // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
+            logger.info(ctx::asMap, Marker.EXIT) { "Connection has been closed" }
         }
         return this
     }
index 4a2ef6b..b735138 100644 (file)
@@ -21,15 +21,17 @@ package org.onap.dcae.collectors.veshv.impl.wire
 
 import arrow.effects.IO
 import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
 import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Flux.defer
 import reactor.core.publisher.SynchronousSink
 
 /**
@@ -38,14 +40,14 @@ import reactor.core.publisher.SynchronousSink
  */
 internal class WireChunkDecoder(
         private val decoder: WireFrameDecoder,
-        alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
-    private val streamBuffer = alloc.compositeBuffer()
+        private val ctx: ClientContext) {
+    private val streamBuffer = ctx.alloc.compositeBuffer()
 
     fun release() {
         streamBuffer.release()
     }
 
-    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
+    fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = defer {
         logIncomingMessage(byteBuf)
         if (byteBuf.readableBytes() == 0) {
             byteBuf.release()
@@ -53,7 +55,7 @@ internal class WireChunkDecoder(
         } else {
             streamBuffer.addComponent(true, byteBuf)
             generateFrames()
-                    .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+                    .onErrorResume { logger.handleReactiveStreamError(ctx, it, Flux.error(it)) }
                     .doFinally { streamBuffer.discardReadComponents() }
         }
     }
@@ -84,15 +86,15 @@ internal class WireChunkDecoder(
     }
 
     private fun logIncomingMessage(wire: ByteBuf) {
-        logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+        logger.trace(ctx) { "Got message with total size of ${wire.readableBytes()} B" }
     }
 
     private fun logDecodedWireMessage(wire: WireFrameMessage) {
-        logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+        logger.trace(ctx) { "Wire payload size: ${wire.payloadSize} B" }
     }
 
     private fun logEndOfData() {
-        logger.trace { "End of data in current TCP buffer" }
+        logger.trace(ctx) { "End of data in current TCP buffer" }
     }
 
     companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt
new file mode 100644 (file)
index 0000000..305e4cb
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
+import java.util.*
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class ClientContext(
+        val alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT,
+        val clientId: String = UUID.randomUUID().toString(),
+        var clientAddress: InetSocketAddress? = null) {
+    fun asMap(): Map<String, String> {
+        val result = mutableMapOf("clientId" to clientId)
+        if (clientAddress != null) {
+            result["clientAddress"] = clientAddress.toString()
+        }
+        return result
+    }
+}
index 437614a..ad97a3f 100644 (file)
@@ -26,15 +26,7 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
 data class Routing(val routes: List<Route>) {
 
     fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
-            Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
-                if (it.isEmpty()) {
-                    logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
-                }
-            }
-
-    companion object {
-        private val logger = Logger(Routing::class)
-    }
+            Option.fromNullable(routes.find { it.applies(commonHeader) })
 }
 
 data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
index e8a3123..e419016 100644 (file)
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.None
 import arrow.core.Some
+import io.netty.buffer.ByteBufAllocator
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
@@ -30,6 +31,7 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.model.routing
@@ -56,7 +58,7 @@ object RouterTest : Spek({
                 withFixedPartitioning()
             }
         }.build()
-        val cut = Router(config)
+        val cut = Router(config, ClientContext())
 
         on("message with existing route (rtpm)") {
             val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
index f06a0dc..e0092cf 100644 (file)
@@ -30,6 +30,7 @@ import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import reactor.test.test
 
 /**
@@ -45,7 +46,7 @@ internal object WireChunkDecoderTest : Spek({
 
     fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
 
-    fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+    fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), ClientContext(alloc))
 
     fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
         for (bb in byteBuffers) {
index 9a4eacf..f4cb6c5 100644 (file)
@@ -1,35 +1,43 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+    <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+    <property name="p_lvl" value="%highlight(%-5level)"/>
+    <property name="p_log" value="%50.50logger"/>WireFrameCodecsTest
+    <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg%n"/>
 
-  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
-    </encoder>
-  </appender>
-
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${SIMPLE_LOG_PATTERN}</pattern>
+        </encoder>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+    <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
 
-  <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
     </root>
 </configuration>
index 0897e91..ef4ce96 100644 (file)
@@ -68,7 +68,7 @@ object PerformanceSpecification : Spek({
             )
 
             val fluxes = (1.rangeTo(runs)).map {
-                sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+                sut.collector.handleConnection(generateDataStream(sut.alloc, params))
             }
             val durationMs = measureTimeMillis {
                 Flux.merge(fluxes).then().block(timeout)
@@ -76,8 +76,8 @@ object PerformanceSpecification : Spek({
 
             val durationSec = durationMs / 1000.0
             val throughput = sink.count / durationSec
-            logger.info("Processed $runs connections each containing $numMessages msgs.")
-            logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+            logger.info { "Processed $runs connections each containing $numMessages msgs." }
+            logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
             assertThat(sink.count)
                     .describedAs("should send all events")
                     .isEqualTo(runs * numMessages)
@@ -99,11 +99,11 @@ object PerformanceSpecification : Spek({
 
             val dataStream = generateDataStream(sut.alloc, params)
                     .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
-            sut.collector.handleConnection(sut.alloc, dataStream)
+            sut.collector.handleConnection(dataStream)
                     .timeout(timeout)
                     .block()
 
-            logger.info("Forwarded ${sink.count} msgs")
+            logger.info { "Forwarded ${sink.count} msgs" }
             assertThat(sink.count)
                     .describedAs("should send up to number of events")
                     .isLessThan(numMessages)
index 0495ced..ce242e0 100644 (file)
@@ -27,6 +27,7 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
@@ -54,7 +55,7 @@ class Sut(sink: Sink = StoringSink()) {
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
-        get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+        get() = collectorProvider(ClientContext(alloc)).getOrElse{ throw IllegalStateException("Collector not available.") }
 
     companion object {
         const val MAX_PAYLOAD_SIZE_BYTES = 1024
@@ -63,6 +64,6 @@ class Sut(sink: Sink = StoringSink()) {
 }
 
 fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
-    collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+    collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
     return sink.sentMessages
 }
index 2d81c67..ab59cc2 100644 (file)
@@ -287,7 +287,7 @@ object VesHvSpecification : Spek({
                         .map { vesWireFrameMessage(PERF3GPP) }
 
 
-                sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+                sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
 
                 val messages = sink.sentMessages
                 val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
index 93f2277..fc80a2f 100644 (file)
@@ -1,35 +1,52 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+    <property name="LOG_FILE"
+              value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
 
-  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
-    </encoder>
-  </appender>
+
+    <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+    <property name="p_thr" value="%thread"/>
+    <property name="p_lvl" value="%highlight(%-5level)"/>
+    <property name="p_log" value="%50.50logger"/>
+    <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+    <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${ONAP_LOG_PATTERN}</pattern>
+        </encoder>
+    </appender>
 
     <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder>
+            <pattern>${ONAP_LOG_PATTERN}</pattern>
+        </encoder>
+        <file>${LOG_FILE}</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+            <maxFileSize>50MB</maxFileSize>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>10GB</totalSizeCap>
+        </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+    <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
 
-  <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="ROLLING-FILE"/>
     </root>
 </configuration>
index 417183f..f7d94de 100644 (file)
@@ -46,7 +46,7 @@ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
             throw IllegalArgumentException(message)
         }
 
-        logger.info("Received new configuration. Creating consumer for topics: $topics")
+        logger.info { "Received new configuration. Creating consumer for topics: $topics" }
         consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
     }.fix()
 
index 20c0f59..36f30e6 100644 (file)
@@ -61,13 +61,13 @@ class MessageStreamValidation(
         return messageParams.fold(
                 {
                     logger.warn { "Error while parsing message parameters: ${it::class.qualifiedName} : ${it.message}" }
-                    logger.debug { "Detailed stack trace: ${it}" }
+                    logger.debug { "Detailed stack trace: $it" }
                     throw IllegalArgumentException("Parsing error: " + it.message)
                 },
                 {
                     if (it.isEmpty()) {
                         val message = "Message param list cannot be empty"
-                        logger.warn(message)
+                        logger.warn { message }
                         throw IllegalArgumentException(message)
                     }
                     it
index a6ee112..e54eb35 100644 (file)
@@ -71,15 +71,15 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 }
                 .delete("messages") { ctx ->
                     ctx.response.contentType(CONTENT_TEXT)
-                    logger.info("Resetting simulator state")
+                    logger.info { "Resetting simulator state" }
                     ctx.response.sendOrError(simulator.resetState())
                 }
                 .get("messages/all/count") { ctx ->
-                    logger.info("Processing request for count of received messages")
+                    logger.info { "Processing request for count of received messages" }
                     simulator.state().fold(
                             {
                                 ctx.response.status(HttpConstants.STATUS_NOT_FOUND)
-                                logger.warn("Error - number of messages could not be specified")
+                                logger.warn { "Error - number of messages could not be specified" }
                             },
                             {
                                 logger.info { "Returned number of received messages: ${it.messagesCount}" }
@@ -90,7 +90,7 @@ class DcaeAppApiServer(private val simulator: DcaeAppSimulator) {
                 }
                 .post("messages/all/validate") { ctx ->
                     ctx.request.body.then { body ->
-                        logger.info("Processing request for message validation")
+                        logger.info { "Processing request for message validation" }
                         val response = simulator.validate(body.inputStream)
                                 .map { isValid ->
                                     if (isValid) {
index 06ff4d5..5856f04 100644 (file)
@@ -43,17 +43,17 @@ fun main(args: Array<String>) =
                 .map(::startApp)
                 .unsafeRunEitherSync(
                         { ex ->
-                            logger.error("Failed to start a server", ex)
+                            logger.withError { log("Failed to start a server", ex) }
                             ExitFailure(1)
                         },
                         {
-                            logger.info("Started DCAE-APP Simulator API server")
+                            logger.info { "Started DCAE-APP Simulator API server" }
                         }
                 )
 
 
 private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    logger.info("Using configuration: $config")
+    logger.info { "Using configuration: $config" }
     val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
     val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
     return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
index 4d12b11..ba07c9c 100644 (file)
@@ -1,36 +1,95 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+    <property name="COMPONENT_NAME"
+              value="hv-ves-dcae-app-simulator"/>
+    <property name="COMPONENT_SHORT_NAME"
+              value="dcae-app-simulator"/>
 
-  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
-    </encoder>
-  </appender>
+    <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+    <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+    <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+
+    <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+    <property name="p_thr" value="%thread"/>
+    <property name="p_lvl" value="%highlight(%-5level)"/>
+    <property name="p_log" value="%50.50logger"/>
+    <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+    <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| %rootException%n"/>
+    <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+    <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
+    <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${LOG_PATTERN_IN_USE}</pattern>
+        </encoder>
+    </appender>
 
     <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder>
+            <pattern>${LOG_PATTERN_IN_USE}</pattern>
+        </encoder>
+        <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+            <maxFileSize>50MB</maxFileSize>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>10GB</totalSizeCap>
+        </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
-  <!--<logger name="reactor.netty" level="DEBUG"/>-->
+    <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+    <!--<logger name="reactor.netty" level="DEBUG"/>-->
 
-  <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="ROLLING-FILE"/>
     </root>
 </configuration>
\ No newline at end of file
diff --git a/sources/hv-collector-domain/src/test/resources/logback.xml b/sources/hv-collector-domain/src/test/resources/logback.xml
deleted file mode 100644 (file)
index 0bf2cb0..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ ============LICENSE_START=======================================================
-  ~ dcaegen2-collectors-veshv
-  ~ ================================================================================
-  ~ Copyright (C) 2018 NOKIA
-  ~ ================================================================================
-  ~ Licensed under the Apache License, Version 2.0 (the "License");
-  ~ you may not use this file except in compliance with the License.
-  ~ You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  ~ ============LICENSE_END=========================================================
-  -->
-<configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
-
-  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
-    </encoder>
-  </appender>
-
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
-    </appender>
-
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
-
-  <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
-    </root>
-</configuration>
\ No newline at end of file
index 3e5c6aa..86c9efc 100644 (file)
             <groupId>io.arrow-kt</groupId>
             <artifactId>arrow-effects</artifactId>
         </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-test</artifactId>
index 899f51f..5c9566c 100644 (file)
@@ -40,15 +40,15 @@ fun main(args: Array<String>) =
                 .map(::startAndAwaitServers)
                 .unsafeRunEitherSync(
                         { ex ->
-                            logger.error("Failed to start a server", ex)
+                            logger.withError { log("Failed to start a server", ex) }
                             ExitFailure(1)
                         },
-                        { logger.info("Gentle shutdown") }
+                        { logger.info { "Gentle shutdown" } }
                 )
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
         IO.monad().binding {
-            logger.info("Using configuration: $config")
+            logger.info { "Using configuration: $config" }
             HealthCheckServer.start(config).bind()
             VesServer.start(config).bind()
                     .await().bind()
index 5c6f127..13b0bc7 100644 (file)
@@ -31,7 +31,7 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
 abstract class ServerStarter {
     fun start(config: ServerConfiguration): IO<ServerHandle> =
             startServer(config)
-                    .map { logger.info(serverStartedMessage(it)); it }
+                    .map { logger.info { serverStartedMessage(it) }; it }
 
     protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
     protected abstract fun serverStartedMessage(handle: ServerHandle): String
index bee0dae..c88b8aa 100644 (file)
@@ -1,4 +1,23 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
     <property name="COMPONENT_NAME"
               value="dcae-hv-ves-collector"/>
     <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
     <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
     <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
-    <property name="FILE_LOG_PATTERN" value="
-%nopexception%50.50logger
-| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
-| %highlight(%-5level)
-| %msg
-| %rootException
-| %thread%n"/>
+
+    <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+    <property name="p_thr" value="%thread"/>
+    <property name="p_lvl" value="%highlight(%-5level)"/>
+    <property name="p_log" value="%50.50logger"/>
+    <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+    <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| %rootException%n"/>
+    <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+    <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
 
     <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
-            <pattern>${FILE_LOG_PATTERN}</pattern>
+            <pattern>${READABLE_LOG_PATTERN}</pattern>
         </encoder>
     </appender>
 
     <appender name="ROLLING-FILE"
               class="ch.qos.logback.core.rolling.RollingFileAppender">
         <encoder>
-            <pattern>${FILE_LOG_PATTERN}</pattern>
+            <pattern>${ONAP_LOG_PATTERN}</pattern>
         </encoder>
         <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
         <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
index 9a4eacf..400b125 100644 (file)
@@ -1,35 +1,43 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+  <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+  <property name="p_lvl" value="%highlight(%-5level)"/>
+  <property name="p_log" value="%50.50logger"/>
+  <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg%n"/>
 
   <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
+      <pattern>${SIMPLE_LOG_PATTERN}</pattern>
     </encoder>
   </appender>
 
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
-    </appender>
-
   <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
 
   <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
-    </root>
+    <appender-ref ref="CONSOLE"/>
+  </root>
 </configuration>
index d017b31..6ca28a5 100644 (file)
@@ -31,7 +31,7 @@ import java.time.Duration
 private val logger = Logger("org.onap.dcae.collectors.veshv.tests.utils")
 
 object Assertions : org.assertj.core.api.Assertions() {
-    fun <A,B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
+    fun <A, B> assertThat(actual: Either<A, B>) = EitherAssert(actual)
 }
 
 
@@ -42,7 +42,7 @@ fun waitUntilSucceeds(retries: Int, sleepTime: Duration, action: () -> Unit) {
     while (tryNum <= retries) {
         tryNum++
         try {
-            logger.debug("Try number $tryNum")
+            logger.debug { "Try number $tryNum" }
             action()
             break
         } catch (ex: Throwable) {
index 5a733f2..a25b291 100644 (file)
@@ -51,7 +51,7 @@ fun <A> ratpack.http.Response.sendEitherErrorOrResponse(response: Either<A, Resp
 fun ratpack.http.Response.sendAndHandleErrors(response: IO<Response>) {
     response.attempt().unsafeRunSync().fold(
             { err ->
-                logger.warn("Error occurred. Sending .", err)
+                logger.withWarn { log("Error occurred. Sending .", err) }
                 val message = err.message
                 send(errorResponse(message))
             },
index 033dd5e..1e5c9c5 100644 (file)
@@ -21,117 +21,214 @@ package org.onap.dcae.collectors.veshv.utils.logging
 
 import kotlin.reflect.KClass
 import org.slf4j.LoggerFactory
+import org.slf4j.MDC
+
+typealias MappedDiagnosticContext = () -> Map<String, String>
 
 @Suppress("TooManyFunctions", "SuboptimalLoggerUsage")
-class Logger(val logger: org.slf4j.Logger) {
+class Logger(logger: org.slf4j.Logger) {
     constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
     constructor(name: String) : this(LoggerFactory.getLogger(name))
 
-    //
-    // TRACE
-    //
+    private val errorLogger = if (logger.isErrorEnabled) ErrorLevelLogger(logger) else OffLevelLogger
+    private val warnLogger = if (logger.isWarnEnabled) WarnLevelLogger(logger) else OffLevelLogger
+    private val infoLogger = if (logger.isInfoEnabled) InfoLevelLogger(logger) else OffLevelLogger
+    private val debugLogger = if (logger.isDebugEnabled) DebugLevelLogger(logger) else OffLevelLogger
+    private val traceLogger = if (logger.isTraceEnabled) TraceLevelLogger(logger) else OffLevelLogger
 
-    val traceEnabled: Boolean
-        get() = logger.isTraceEnabled
+    // ERROR
 
-    fun trace(messageProvider: () -> String) {
-        if (logger.isTraceEnabled) {
-            logger.trace(messageProvider())
-        }
+    fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block()
+
+    fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            errorLogger.withMdc(mdc, block)
+
+    fun error(message: () -> String) = errorLogger.run {
+        log(message())
     }
 
-    //
+    fun error(mdc: MappedDiagnosticContext, message: () -> String) =
+            errorLogger.withMdc(mdc) { log(message()) }
+
+    fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+            errorLogger.withMdc(mdc) { log(marker, message()) }
+
+    // WARN
+
+    fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
+
+    fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            warnLogger.withMdc(mdc, block)
+
+    fun warn(message: () -> String) = warnLogger.run {
+        log(message())
+    }
+
+    fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
+            warnLogger.withMdc(mdc) { log(message()) }
+
+    fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+            warnLogger.withMdc(mdc) { log(marker, message()) }
+
+    // INFO
+
+    fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
+
+    fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            infoLogger.withMdc(mdc, block)
+
+    fun info(message: () -> String) = infoLogger.run {
+        log(message())
+    }
+
+    fun info(mdc: MappedDiagnosticContext, message: () -> String) =
+            infoLogger.withMdc(mdc) { log(message()) }
+
+    fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+            infoLogger.withMdc(mdc) { log(marker, message()) }
+
     // DEBUG
-    //
 
-    fun debug(message: String) {
-        logger.debug(message)
+    fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
+
+    fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            debugLogger.withMdc(mdc, block)
+
+    fun debug(message: () -> String) = debugLogger.run {
+        log(message())
     }
 
-    fun debug(message: String, t: Throwable) {
-        logger.debug(message, t)
+    fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
+            debugLogger.withMdc(mdc) { log(message()) }
+
+    fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+            debugLogger.withMdc(mdc) { log(marker, message()) }
+
+    // TRACE
+
+    fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block()
+
+    fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
+            traceLogger.withMdc(mdc, block)
+
+    fun trace(message: () -> String) = traceLogger.run {
+        log(message())
     }
 
-    fun debug(messageProvider: () -> String) {
-        if (logger.isDebugEnabled) {
-            logger.debug(messageProvider())
+    fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
+            traceLogger.withMdc(mdc) { log(message()) }
+
+    fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+            traceLogger.withMdc(mdc) { log(marker, message()) }
+
+}
+
+abstract class AtLevelLogger {
+    abstract fun log(message: String)
+    abstract fun log(message: String, t: Throwable)
+    abstract fun log(marker: Marker, message: String)
+    open val enabled: Boolean
+        get() = true
+
+    inline fun withMdc(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) {
+        if (enabled) {
+            try {
+                MDC.setContextMap(mdc())
+                block()
+            } finally {
+                MDC.clear()
+            }
         }
     }
+}
 
-    fun debug(t: Throwable, messageProvider: () -> String) {
-        if (logger.isDebugEnabled) {
-            logger.debug(messageProvider(), t)
-        }
+object OffLevelLogger : AtLevelLogger() {
+    override val enabled = false
+
+    override fun log(message: String) {
+        // do not log anything
     }
 
-    //
-    // INFO
-    //
-    fun info(message: String) {
-        logger.info(message)
+    override fun log(message: String, t: Throwable) {
+        // do not log anything
     }
 
-    fun info(messageProvider: () -> String) {
-        if (logger.isInfoEnabled) {
-            logger.info(messageProvider())
-        }
+    override fun log(marker: Marker, message: String) {
+        // do not log anything
     }
+}
 
-    fun info(message: String, t: Throwable) {
-        logger.info(message, t)
+@Suppress("SuboptimalLoggerUsage")
+class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.error(message)
     }
 
-    fun info(t: Throwable, messageProvider: () -> String) {
-        if (logger.isInfoEnabled) {
-            logger.info(messageProvider(), t)
-        }
+    override fun log(message: String, t: Throwable) {
+        logger.error(message, t)
     }
 
-    //
-    // WARN
-    //
+    override fun log(marker: Marker, message: String) {
+        logger.error(marker(), message)
+    }
+}
 
-    fun warn(message: String) {
+@Suppress("SuboptimalLoggerUsage")
+class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
         logger.warn(message)
     }
 
-    fun warn(message: String, t: Throwable) {
+    override fun log(message: String, t: Throwable) {
         logger.warn(message, t)
     }
 
-    fun warn(messageProvider: () -> String) {
-        if (logger.isWarnEnabled) {
-            logger.warn(messageProvider())
-        }
+    override fun log(marker: Marker, message: String) {
+        logger.warn(marker(), message)
     }
+}
 
-    fun warn(t: Throwable, messageProvider: () -> String) {
-        if (logger.isWarnEnabled) {
-            logger.warn(messageProvider(), t)
-        }
+@Suppress("SuboptimalLoggerUsage")
+class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.info(message)
     }
 
-    //
-    // ERROR
-    //
+    override fun log(message: String, t: Throwable) {
+        logger.info(message, t)
+    }
 
-    fun error(message: String) {
-        logger.error(message)
+    override fun log(marker: Marker, message: String) {
+        logger.info(marker(), message)
     }
+}
 
-    fun error(message: String, t: Throwable) {
-        logger.error(message, t)
+@Suppress("SuboptimalLoggerUsage")
+class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.debug(message)
     }
 
-    fun error(messageProvider: () -> String) {
-        if (logger.isErrorEnabled) {
-            logger.error(messageProvider())
-        }
+    override fun log(message: String, t: Throwable) {
+        logger.debug(message, t)
     }
 
-    fun error(t: Throwable, messageProvider: () -> String) {
-        if (logger.isErrorEnabled) {
-            logger.error(messageProvider(), t)
-        }
+    override fun log(marker: Marker, message: String) {
+        logger.debug(marker(), message)
+    }
+}
+
+@Suppress("SuboptimalLoggerUsage")
+class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() {
+    override fun log(message: String) {
+        logger.trace(message)
+    }
+
+    override fun log(message: String, t: Throwable) {
+        logger.trace(message, t)
+    }
+
+    override fun log(marker: Marker, message: String) {
+        logger.trace(marker(), message)
     }
 }
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt
new file mode 100644 (file)
index 0000000..83fb9a5
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import org.slf4j.MarkerFactory
+
+enum class Marker(private val marker: org.slf4j.Marker) {
+    ENTRY(MarkerFactory.getMarker("ENTRY")),
+    EXIT(MarkerFactory.getMarker("EXIT")),
+    INVOKE(MarkerFactory.getMarker("INVOKE"));
+
+    operator fun invoke() = marker
+}
index e8ec254..95590d9 100644 (file)
@@ -25,42 +25,48 @@ import arrow.core.Try
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 
-fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
-    logger.debug("Detailed stack trace", ex)
+fun <T> Logger.handleReactiveStreamError(
+        context: MappedDiagnosticContext,
+        ex: Throwable,
+        returnFlux: Flux<T> = Flux.empty()): Flux<T> {
+    warn(context) { "Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})" }
+    withDebug(context) { log("Detailed stack trace", ex) }
     return returnFlux
 }
 
-
 fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   context: MappedDiagnosticContext,
                                    acceptedMsg: (T) -> String,
                                    rejectedMsg: (Throwable) -> String): Flux<T> =
-        fold({
-            logger.warn(rejectedMsg(it))
+        fold({ ex ->
+            logger.withWarn(context) { log(rejectedMsg(ex)) }
             Flux.empty<T>()
-        }, {
-            logger.trace { acceptedMsg(it) }
-            Flux.just(it)
+        }, { obj ->
+            logger.trace(context) { acceptedMsg(obj) }
+            Flux.just(obj)
         })
 
 fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     context: MappedDiagnosticContext,
                                      acceptedMsg: (T) -> String,
                                      rejectedMsg: () -> String): Flux<T> =
         fold({
-            logger.warn(rejectedMsg)
+            logger.warn(context, rejectedMsg)
             Flux.empty<T>()
         }, {
-            logger.trace { acceptedMsg(it) }
+            logger.trace(context) { acceptedMsg(it) }
             Flux.just(it)
         })
 
-fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger,
+                                    context: MappedDiagnosticContext,
+                                    predicate: (T) -> Either<() -> String, () -> String>) =
         flatMap { t ->
             predicate(t).fold({
-                logger.warn(it)
+                logger.warn(context, it)
                 Mono.empty<T>()
             }, {
-                logger.trace(it)
+                logger.trace(context, it)
                 Mono.just<T>(t)
             })
         }
index c27fb8c..10fc8d8 100644 (file)
@@ -34,11 +34,16 @@ import org.jetbrains.spek.api.dsl.it
 object LoggerTest : Spek({
 
     lateinit var slf4jLogger: org.slf4j.Logger
-    lateinit var cut: Logger
+    fun cut() = Logger(slf4jLogger).also {
+        verify(slf4jLogger).isTraceEnabled
+        verify(slf4jLogger).isDebugEnabled
+        verify(slf4jLogger).isInfoEnabled
+        verify(slf4jLogger).isWarnEnabled
+        verify(slf4jLogger).isErrorEnabled
+    }
 
     beforeEachTest {
         slf4jLogger = mock()
-        cut = Logger(slf4jLogger)
     }
 
     afterEachTest {
@@ -50,28 +55,19 @@ object LoggerTest : Spek({
         val exception = Exception("fail")
 
         describe("debug levels") {
-            it("should log message") {
-                cut.debug(message)
-                verify(slf4jLogger).debug(message)
-            }
-
-            it("should log message with exception") {
-                cut.debug(message, exception)
-                verify(slf4jLogger).debug(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
-                    cut.debug { message }
+                    cut().debug { message }
                     verify(slf4jLogger).isDebugEnabled
                     verify(slf4jLogger).debug(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
-                    cut.debug { message }
+                    cut().debug { message }
                     verify(slf4jLogger).isDebugEnabled
                 }
             }
@@ -80,42 +76,33 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(true)
-                    cut.debug(exception) { message }
+                    cut().withDebug { log(message, exception) }
                     verify(slf4jLogger).isDebugEnabled
                     verify(slf4jLogger).debug(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isDebugEnabled).thenReturn(false)
-                    cut.debug(exception) { message }
+                    cut().withDebug { log(message, exception) }
                     verify(slf4jLogger).isDebugEnabled
                 }
             }
         }
 
         describe("info levels") {
-            it("should log message") {
-                cut.info(message)
-                verify(slf4jLogger).info(message)
-            }
-
-            it("should log message with exception") {
-                cut.info(message, exception)
-                verify(slf4jLogger).info(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
-                    cut.info { message }
+                    cut().info { message }
                     verify(slf4jLogger).isInfoEnabled
                     verify(slf4jLogger).info(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
-                    cut.info { message }
+                    cut().info { message }
                     verify(slf4jLogger).isInfoEnabled
                 }
             }
@@ -124,42 +111,32 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(true)
-                    cut.info(exception) { message }
+                    cut().withInfo { log(message, exception) }
                     verify(slf4jLogger).isInfoEnabled
                     verify(slf4jLogger).info(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isInfoEnabled).thenReturn(false)
-                    cut.info(exception) { message }
+                    cut().withInfo { log(message, exception) }
                     verify(slf4jLogger).isInfoEnabled
                 }
             }
         }
 
         describe("warning levels") {
-            it("should log message") {
-                cut.warn(message)
-                verify(slf4jLogger).warn(message)
-            }
-
-            it("should log message with exception") {
-                cut.warn(message, exception)
-                verify(slf4jLogger).warn(message, exception)
-            }
-
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
-                    cut.warn { message }
+                    cut().warn { message }
                     verify(slf4jLogger).isWarnEnabled
                     verify(slf4jLogger).warn(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
-                    cut.warn { message }
+                    cut().warn { message }
                     verify(slf4jLogger).isWarnEnabled
                 }
             }
@@ -168,42 +145,33 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(true)
-                    cut.warn(exception) { message }
+                    cut().withWarn { log(message, exception) }
                     verify(slf4jLogger).isWarnEnabled
                     verify(slf4jLogger).warn(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isWarnEnabled).thenReturn(false)
-                    cut.warn(exception) { message }
+                    cut().withWarn { log(message, exception) }
                     verify(slf4jLogger).isWarnEnabled
                 }
             }
         }
 
         describe("error levels") {
-            it("should log message") {
-                cut.error(message)
-                verify(slf4jLogger).error(message)
-            }
-
-            it("should log message with exception") {
-                cut.error(message, exception)
-                verify(slf4jLogger).error(message, exception)
-            }
 
             describe("lazy logging message") {
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
-                    cut.error { message }
+                    cut().error { message }
                     verify(slf4jLogger).isErrorEnabled
                     verify(slf4jLogger).error(message)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
-                    cut.error { message }
+                    cut().error { message }
                     verify(slf4jLogger).isErrorEnabled
                 }
             }
@@ -212,14 +180,14 @@ object LoggerTest : Spek({
 
                 it("should log when debug is ON") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(true)
-                    cut.error(exception) { message }
+                    cut().withError { log(message, exception) }
                     verify(slf4jLogger).isErrorEnabled
                     verify(slf4jLogger).error(message, exception)
                 }
 
                 it("should not log when debug is OFF") {
                     whenever(slf4jLogger.isErrorEnabled).thenReturn(false)
-                    cut.error(exception) { message }
+                    cut().withError { log(message, exception) }
                     verify(slf4jLogger).isErrorEnabled
                 }
             }
index 0f359df..da956be 100644 (file)
@@ -42,7 +42,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Try.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                    cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -53,7 +53,7 @@ class ReactiveLoggingTest : Spek({
                 val e = Exception()
                 val cut = Failure(e)
                 it("should filter stream event and log rejected message") {
-                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                    cut.filterFailedWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
                             .test()
                             .verifyComplete()
                 }
@@ -65,7 +65,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Option.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                    cut.filterEmptyWithLog(logger, ::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -75,7 +75,7 @@ class ReactiveLoggingTest : Spek({
             given("empty Option") {
                 val cut = Option.empty<Int>()
                 it("should filter stream event and log rejected message") {
-                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                    cut.filterEmptyWithLog(logger,::emptyMap, ACCEPTED_MESSAGE, FAILED_MESSAGE)
                             .test()
                             .verifyComplete()
                 }
@@ -88,7 +88,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Flux.just(event)
 
                 it("should not filter stream event and log accepted message") {
-                    cut.filterFailedWithLog(logger, right())
+                    cut.filterFailedWithLog(logger,::emptyMap, right())
                             .test()
                             .expectNext(event)
                             .verifyComplete()
@@ -99,7 +99,7 @@ class ReactiveLoggingTest : Spek({
                 val cut = Flux.just(event)
 
                 it("should filter stream event and log rejected message") {
-                    cut.filterFailedWithLog(logger, left())
+                    cut.filterFailedWithLog(logger,::emptyMap, left())
                             .test()
                             .verifyComplete()
                 }
index 9a4eacf..400b125 100644 (file)
@@ -1,35 +1,43 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+  <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+  <property name="p_lvl" value="%highlight(%-5level)"/>
+  <property name="p_log" value="%50.50logger"/>
+  <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg%n"/>
 
   <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
+      <pattern>${SIMPLE_LOG_PATTERN}</pattern>
     </encoder>
   </appender>
 
-    <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
-    </appender>
-
   <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
 
   <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
-    </root>
+    <appender-ref ref="CONSOLE"/>
+  </root>
 </configuration>
index 57aaf3d..ca6d169 100644 (file)
@@ -61,12 +61,14 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .handle { _, output -> handler(complete, messages, output) }
                 .connect()
                 .doOnError {
-                    logger.info("Failed to connect to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
+                    logger.info {
+                        "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+                    }
                 }
                 .subscribe {
-                    logger.info("Connected to VesHvCollector on " +
-                            "${configuration.vesHost}:${configuration.vesPort}")
+                    logger.info {
+                        "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
+                    }
                 }
         return complete.then()
     }
@@ -86,7 +88,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
                 .options { it.flushOnBoundary() }
                 .sendGroups(frames)
                 .then {
-                    logger.info("Messages have been sent")
+                    logger.info { "Messages have been sent" }
                     complete.onComplete()
                 }
                 .then()
index 1601938..cfd3a6e 100644 (file)
@@ -59,17 +59,17 @@ internal class XnfApiServer(
                 .post("simulator/async", ::startSimulationHandler)
                 .get("simulator/:id", ::simulatorStatusHandler)
                 .get("healthcheck") { ctx ->
-                    logger.info("Checking health")
+                    logger.info { "Checking health" }
                     ctx.response.status(HttpConstants.STATUS_OK).send()
                 }
     }
 
     private fun startSimulationHandler(ctx: Context) {
-        logger.info("Attempting to start asynchronous scenario")
+        logger.info { "Attempting to start asynchronous scenario" }
         ctx.request.body.then { body ->
             val id = startSimulation(body)
             when (id) {
-                is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}"}
+                is Either.Left -> logger.warn { "Failed to start scenario, ${id.a}" }
                 is Either.Right -> logger.info { "Scenario started, details: ${id.b}" }
             }
             ctx.response.sendEitherErrorOrResponse(id)
@@ -83,7 +83,7 @@ internal class XnfApiServer(
     }
 
     private fun simulatorStatusHandler(ctx: Context) {
-        logger.debug("Checking task status")
+        logger.debug { "Checking task status" }
         val id = UUID.fromString(ctx.pathTokens["id"])
         logger.debug { "Checking status for id: $id" }
         val status = ongoingSimulations.status(id)
index 21748ae..d7d42d8 100644 (file)
@@ -43,11 +43,11 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
         simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
             result.fold(
                     { err ->
-                        logger.warn("Error", err)
+                        logger.withWarn { log("Error", err) }
                         simulations[id] = StatusFailure(err)
                     },
                     {
-                        logger.info("Finished sending messages")
+                        logger.info { "Finished sending messages" }
                         simulations[id] = StatusSuccess
                     }
             )
index 4512dfb..91070d3 100644 (file)
@@ -42,7 +42,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
 fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
         .map { config ->
-            logger.info("Using configuration: $config")
+            logger.info { "Using configuration: $config" }
             val xnfSimulator = XnfSimulator(
                     VesHvClient(config),
                     MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
@@ -52,10 +52,10 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
         }
         .unsafeRunEitherSync(
                 { ex ->
-                    logger.error("Failed to start a server", ex)
+                    logger.withError { log("Failed to start a server", ex) }
                     ExitFailure(1)
                 },
                 {
-                    logger.info("Started xNF Simulator API server")
+                    logger.info { "Started xNF Simulator API server" }
                 }
         )
index 809f62d..2bc3f97 100644 (file)
@@ -1,35 +1,94 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ ============LICENSE_START=======================================================
+  ~ dcaegen2-collectors-veshv
+  ~ ================================================================================
+  ~ Copyright (C) 2018 NOKIA
+  ~ ================================================================================
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~ ============LICENSE_END=========================================================
+-->
 <configuration>
-  <property name="LOG_FILE"
-    value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
-  <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+    <property name="COMPONENT_NAME"
+              value="hv-ves-xnf-simulator"/>
+    <property name="COMPONENT_SHORT_NAME"
+              value="xnf-simulatr"/>
 
-  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-    <encoder>
-      <pattern>
-        %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
-      </pattern>
-    </encoder>
-  </appender>
+    <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+    <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+    <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+
+    <property name="p_tim" value="%date{&quot;yyyy-MM-dd'T'HH:mm:ss.SSSXXX&quot;, UTC}"/>
+    <property name="p_thr" value="%thread"/>
+    <property name="p_lvl" value="%highlight(%-5level)"/>
+    <property name="p_log" value="%50.50logger"/>
+    <property name="p_mdc" value="%replace(%replace(%mdc){'\t', '\\\\t'}){'\n', '\\\\n'}"/>
+    <property name="p_msg" value="%replace(%replace(%msg){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_exc" value="%replace(%replace(%rootException){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="p_mak" value="%replace(%replace(%marker){'\t', '\\\\t'}){'\n','\\\\n'}"/>
+    <property name="SIMPLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| %rootException%n"/>
+    <property name="READABLE_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_log}\t
+| ${p_lvl}\t
+| %msg\t
+| ${p_mak}\t
+| %rootException\t
+| ${p_mdc}\t
+| ${p_thr}%n"/>
+    <property name="ONAP_LOG_PATTERN" value="
+%nopexception
+| ${p_tim}\t
+| ${p_thr}\t
+| ${p_lvl}\t
+| ${p_log}\t
+| ${p_mdc}\t
+| ${p_msg}\t
+| ${p_exc}\t
+| ${p_mak}%n"/>
+    <property name="LOG_PATTERN_IN_USE" value="${SIMPLE_LOG_PATTERN}"/>
+
+    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>${LOG_PATTERN_IN_USE}</pattern>
+        </encoder>
+    </appender>
 
     <appender name="ROLLING-FILE"
-      class="ch.qos.logback.core.rolling.RollingFileAppender">
-      <encoder>
-        <pattern>${FILE_LOG_PATTERN}</pattern>
-      </encoder>
-      <file>${LOG_FILE}</file>
-      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-        <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
-        <maxFileSize>50MB</maxFileSize>
-        <maxHistory>30</maxHistory>
-        <totalSizeCap>10GB</totalSizeCap>
-      </rollingPolicy>
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder>
+            <pattern>${LOG_PATTERN_IN_USE}</pattern>
+        </encoder>
+        <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+            <maxFileSize>50MB</maxFileSize>
+            <maxHistory>30</maxHistory>
+            <totalSizeCap>10GB</totalSizeCap>
+        </rollingPolicy>
     </appender>
 
-  <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+    <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
 
-  <root level="INFO">
-      <appender-ref ref="CONSOLE"/>
-      <appender-ref ref="ROLLING-FILE"/>
+    <root level="INFO">
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="ROLLING-FILE"/>
     </root>
 </configuration>
\ No newline at end of file