<artifactId>hv-collector-main</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-server</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-ssl</artifactId>
<dependencies>
<dependency>
- <groupId>${project.parent.groupId}</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>hv-collector-configuration</artifactId>
- <version>${project.parent.version}</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>${project.parent.groupId}</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
+ <version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>${project.parent.groupId}</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>hv-collector-utils</artifactId>
- <version>${project.parent.version}</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>${project.parent.groupId}</groupId>
+ <groupId>${project.groupId}</groupId>
<artifactId>hv-collector-ssl</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-test-utils</artifactId>
- <version>${project.parent.version}</version>
- <scope>test</scope>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
package org.onap.dcae.collectors.veshv.boundary
import io.netty.buffer.ByteBuf
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
operator fun invoke(ctx: ClientContext): Collector
}
-interface Server {
- fun start(): Mono<ServerHandle>
-}
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import io.netty.handler.codec.http.HttpStatusClass
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
+import org.onap.dcae.collectors.veshv.domain.logging.OnapMdc
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import java.util.*
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.withDebug
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.trace
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.impl.createKafkaSender
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.trace
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Flux.defer
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import reactor.test.test
/**
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
</build>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.onap.dcaegen2.services.sdk</groupId>
<artifactId>hvvesclient-protobuf</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.jetbrains.spek</groupId>
<groupId>org.jetbrains.spek</groupId>
<artifactId>spek-junit-platform-engine</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.nhaarman.mockitokotlin2</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
</dependencies>
</project>
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.model
+package org.onap.dcae.collectors.veshv.domain.logging
import arrow.core.None
import arrow.core.Option
import arrow.core.getOrElse
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import java.net.InetAddress
import java.security.cert.X509Certificate
import java.util.*
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.domain.logging
-import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
@Suppress("TooManyFunctions")
-internal object ClientContextLogging {
+object ClientContextLogging {
fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block)
fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block)
fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block)
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.utils.logging
+package org.onap.dcae.collectors.veshv.domain.logging
import org.slf4j.MarkerFactory
import java.time.Instant
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.logging
+
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
+import org.slf4j.MDC
+
+
+@Suppress("TooManyFunctions")
+object MarkerLogging {
+ fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } }
+
+ fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } }
+
+ fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } }
+
+ fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
+ withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } }
+
+ fun Logger.info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ withInfo(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } }
+
+ fun Logger.debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ withDebug(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } }
+
+ fun Logger.trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
+ withTrace(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } }
+
+
+ private inline fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) {
+ if (mdc.isEmpty()) {
+ block()
+ } else {
+ try {
+ mdc.forEach(MDC::put)
+ block()
+ } finally {
+ mdc.keys.forEach(MDC::remove)
+ }
+ }
+ }
+}
\ No newline at end of file
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.utils.logging
+package org.onap.dcae.collectors.veshv.domain.logging
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.model
+package org.onap.dcae.collectors.veshv.domain.logging
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import java.net.InetAddress
import java.net.UnknownHostException
import java.util.*
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.model
+package org.onap.dcae.collectors.veshv.domain.logging
import arrow.core.Some
import com.nhaarman.mockitokotlin2.mock
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
-import java.net.Inet4Address
import java.net.InetAddress
-import java.net.InetSocketAddress
import java.security.cert.X509Certificate
-import java.util.*
import javax.security.auth.x500.X500Principal
/**
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.model
+package org.onap.dcae.collectors.veshv.domain.logging
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc
import java.util.*
/**
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration>
--- /dev/null
+mock-maker-inline
\ No newline at end of file
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>hv-collector-core</artifactId>
+ <artifactId>hv-collector-health-check</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>hv-collector-health-check</artifactId>
+ <artifactId>hv-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
*/
package org.onap.dcae.collectors.veshv.main
+import org.onap.dcae.collectors.veshv.api.ServersFactory
import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
-import org.onap.dcae.collectors.veshv.main.servers.VesServer
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
private val hvVesServer = AtomicReference<ServerHandle>()
private val configurationModule = ConfigurationModule()
+private val sslContextFactory = SslContextFactory()
private val maxCloseTime = Duration.ofSeconds(10)
fun main(args: Array<String>) {
private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer {
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
logger.debug(ServiceContext::mdc) { "Configuration: $config" }
- VesServer.start(config)
+ ServersFactory.createHvVesServer(
+ config,
+ sslContextFactory,
+ MicrometerMetrics.INSTANCE
+ ).start()
}
private fun stopRunningServer() = Mono.defer {
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.net.InetSocketAddress
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2019 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-server</artifactId>
+ <description>VES HighVolume Collector :: Server</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ssl</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.factory
+package org.onap.dcae.collectors.veshv.api
-import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.impl.HvVesServer
import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Mono
+
+interface Server {
+ fun start(): Mono<ServerHandle>
+}
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-object ServerFactory {
-
- private val sslFactory = SslContextFactory()
-
- fun createNettyTcpServer(serverConfig: ServerConfiguration,
- securityConfig: SecurityConfiguration,
- collectorFactory: CollectorFactory,
- metrics: Metrics
- ): Server = NettyTcpServer(
- serverConfig,
- sslFactory.createServerContext(securityConfig),
- collectorFactory,
- metrics
- )
+object ServersFactory {
+ fun createHvVesServer(config: HvVesConfiguration,
+ sslContextFactory: SslContextFactory,
+ metrics: Metrics): Server = HvVesServer(config, sslContextFactory, metrics)
}
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.main.servers
+package org.onap.dcae.collectors.veshv.impl
-import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.api.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory
-import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.factory.AdapterFactory
-import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Mono
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-object VesServer {
+internal class HvVesServer(private val config: HvVesConfiguration,
+ private val sslFactory: SslContextFactory,
+ private val metrics: Metrics) : Server {
- private val logger = Logger(VesServer::class)
+ private val logger = Logger(HvVesServer::class)
- fun start(config: HvVesConfiguration): Mono<ServerHandle> =
- createVesServer(config)
+ override fun start(): Mono<ServerHandle> =
+ createNettyTcpServer(config)
.start()
.doOnNext(::logServerStarted)
- private fun createVesServer(config: HvVesConfiguration): Server =
- createCollectorProvider(config)
- .let { collectorProvider ->
- ServerFactory.createNettyTcpServer(
- config.server,
- config.security,
- collectorProvider,
- MicrometerMetrics.INSTANCE
- )
- }
+ private fun createNettyTcpServer(config: HvVesConfiguration): Server =
+ NettyTcpServer(
+ config.server,
+ sslFactory.createServerContext(config.security),
+ createCollectorProvider(config),
+ metrics
+ )
private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory =
HvVesCollectorFactory(
config.collector,
AdapterFactory.sinkCreatorFactory(),
- MicrometerMetrics.INSTANCE
+ metrics
)
private fun logServerStarted(handle: ServerHandle) =
logger.info(ServiceContext::mdc) {
"HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
}
-
}
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.socket
+package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
import arrow.core.getOrElse
import io.netty.handler.ssl.SslContext
+import org.onap.dcae.collectors.veshv.api.Server
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorFactory
import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.Marker
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.debug
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.info
+import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.info
import reactor.core.publisher.Mono
import reactor.netty.Connection
import reactor.netty.NettyInbound
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.socket
+package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
import arrow.core.Try
import arrow.syntax.collections.firstOption
import io.netty.handler.ssl.SslHandler
import io.netty.util.concurrent.Future
-import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import reactor.core.publisher.Mono
import reactor.netty.ByteBufFlux
import reactor.netty.Connection
</build>
<dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
import kotlin.reflect.KClass
import org.slf4j.LoggerFactory
import org.slf4j.MDC
+import org.slf4j.Marker
typealias MappedDiagnosticContext = () -> Map<String, String>
fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block()
fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
- errorLogger.withMdc(mdc, block)
+ errorLogger.withMdc(mdc, block)
fun error(message: () -> String) = errorLogger.run {
log(message())
}
fun error(mdc: MappedDiagnosticContext, message: () -> String) =
- errorLogger.withMdc(mdc) { log(message()) }
-
- fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
- errorLogger.withMdc(mdc) { log(marker, message()) }
-
- fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
- errorLogger.withMdc(mdc) { log(marker, message(), t) }
+ errorLogger.withMdc(mdc) { log(message()) }
// WARN
fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
- warnLogger.withMdc(mdc, block)
+ warnLogger.withMdc(mdc, block)
fun warn(message: () -> String) = warnLogger.run {
log(message())
}
fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
- warnLogger.withMdc(mdc) { log(message()) }
-
- fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
- warnLogger.withMdc(mdc) { log(marker, message()) }
-
- fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) =
- warnLogger.withMdc(mdc) { log(marker, message(), t) }
+ warnLogger.withMdc(mdc) { log(message()) }
// INFO
fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
- infoLogger.withMdc(mdc, block)
+ infoLogger.withMdc(mdc, block)
fun info(message: () -> String) = infoLogger.run {
log(message())
}
fun info(mdc: MappedDiagnosticContext, message: () -> String) =
- infoLogger.withMdc(mdc) { log(message()) }
-
- fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
- infoLogger.withMdc(mdc) { log(marker, message()) }
+ infoLogger.withMdc(mdc) { log(message()) }
// DEBUG
fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
- debugLogger.withMdc(mdc, block)
+ debugLogger.withMdc(mdc, block)
fun debug(message: () -> String) = debugLogger.run {
log(message())
}
fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
- debugLogger.withMdc(mdc) { log(message()) }
-
- fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
- debugLogger.withMdc(mdc) { log(marker, message()) }
+ debugLogger.withMdc(mdc) { log(message()) }
// TRACE
fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block()
fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
- traceLogger.withMdc(mdc, block)
+ traceLogger.withMdc(mdc, block)
fun trace(message: () -> String) = traceLogger.run {
log(message())
}
fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
- traceLogger.withMdc(mdc) { log(message()) }
-
- fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
- traceLogger.withMdc(mdc) { log(marker, message()) }
+ traceLogger.withMdc(mdc) { log(message()) }
companion object {
fun setLogLevel(packageName: String, level: LogLevel) {
}
}
}
-
- protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) {
- if (mdc.isEmpty()) {
- block()
- } else {
- try {
- mdc.forEach(MDC::put)
- block()
- } finally {
- mdc.keys.forEach(MDC::remove)
- }
- }
- }
}
object OffLevelLogger : AtLevelLogger() {
}
override fun log(marker: Marker, message: String) =
- withAdditionalMdc(marker.mdc) {
- logger.error(marker.slf4jMarker, message)
- }
+ logger.error(marker, message)
override fun log(marker: Marker, message: String, t: Throwable) =
- withAdditionalMdc(marker.mdc) {
- logger.error(marker.slf4jMarker, message, t)
- }
+ logger.error(marker, message, t)
}
@Suppress("SuboptimalLoggerUsage")
}
override fun log(marker: Marker, message: String) =
- withAdditionalMdc(marker.mdc) {
- logger.warn(marker.slf4jMarker, message)
- }
+ logger.warn(marker, message)
override fun log(marker: Marker, message: String, t: Throwable) =
- withAdditionalMdc(marker.mdc) {
- logger.warn(marker.slf4jMarker, message, t)
- }
+ logger.warn(marker, message, t)
}
@Suppress("SuboptimalLoggerUsage")
}
override fun log(marker: Marker, message: String) =
- withAdditionalMdc(marker.mdc) {
- logger.info(marker.slf4jMarker, message)
- }
+ logger.info(marker, message)
override fun log(marker: Marker, message: String, t: Throwable) =
- withAdditionalMdc(marker.mdc) {
- logger.info(marker.slf4jMarker, message, t)
- }
+ logger.info(marker, message, t)
}
@Suppress("SuboptimalLoggerUsage")
}
override fun log(marker: Marker, message: String) =
- withAdditionalMdc(marker.mdc) {
- logger.debug(marker.slf4jMarker, message)
- }
+ logger.debug(marker, message)
override fun log(marker: Marker, message: String, t: Throwable) =
- withAdditionalMdc(marker.mdc) {
- logger.debug(marker.slf4jMarker, message, t)
- }
+ logger.debug(marker, message, t)
}
@Suppress("SuboptimalLoggerUsage")
}
override fun log(marker: Marker, message: String) =
- withAdditionalMdc(marker.mdc) {
- logger.trace(marker.slf4jMarker, message)
- }
+ logger.trace(marker, message)
override fun log(marker: Marker, message: String, t: Throwable) =
- withAdditionalMdc(marker.mdc) {
- logger.trace(marker.slf4jMarker, message, t)
- }
+ logger.trace(marker, message, t)
}
<module>hv-collector-domain</module>
<module>hv-collector-health-check</module>
<module>hv-collector-main</module>
+ <module>hv-collector-server</module>
<module>hv-collector-ssl</module>
<module>hv-collector-test-utils</module>
<module>hv-collector-utils</module>