Fix bug when xnf simnulator was using same SecurityKeys object instance for every new VesClient,
which resulted in fault while trying to connect to collector.
With new implementation simulator reuses same HvVesProdcuer from SDK
for every VesEvent request received and creates new Producer for every
WireFrameEvent request. This allows to continue testing cases in which
there is need to assert if connection was dropped from malicious client.
Change-Id: I5f51a58de85cccf7de6ab2392f86259502be31dd
Issue-ID: DCAEGEN2-1291
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
<jacoco.version>0.8.2</jacoco.version>
<detekt.version>1.0.0-RC11</detekt.version>
- <sdk.version>1.1.2-SNAPSHOT</sdk.version>
+ <sdk.version>1.1.3-SNAPSHOT</sdk.version>
<!-- Protocol buffers -->
<protobuf.version>3.6.1</protobuf.version>
const val TRUST_STORE_FILE = "/etc/ves-hv/trust.p12"
fun createSecurityConfiguration(cmdLine: CommandLine): Try<SecurityConfiguration> =
- if (cmdLine.hasOption(CommandLineOption.SSL_DISABLE))
- Try { disabledSecurityConfiguration() }
+ createSecurityConfigurationProvider(cmdLine).map { it() }
+
+fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> SecurityConfiguration> =
+ if (shouldDisableSsl(cmdLine))
+ Try { { disabledSecurityConfiguration() } }
else
- enabledSecurityConfiguration(cmdLine)
+ Try { { enabledSecurityConfiguration(cmdLine) } }
+
+private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE)
private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None)
-private fun enabledSecurityConfiguration(cmdLine: CommandLine) = Try {
+private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE)
val ksPass = cmdLine.stringValue(CommandLineOption.KEY_STORE_PASSWORD).getOrElse { "" }
val tsFile = cmdLine.stringValue(CommandLineOption.TRUST_STORE_FILE, TRUST_STORE_FILE)
.trustStorePassword(Passwords.fromString(tsPass))
.build()
- SecurityConfiguration(keys = Some(keys))
+ return SecurityConfiguration(keys = Some(keys))
}
+
private fun pathFromFile(file: String) = Paths.get(file)
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
+import java.nio.ByteBuffer
import javax.json.Json
import javax.json.JsonArray
private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+ private val defaultHvVesClient by lazy { clientFactory.create() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
- .toFlux()
- .flatMap(::simulate)
- .then(Mono.just(Unit))
- .asIo()
- private fun simulate(parameters: MessageParameters): Mono<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ parameters
+ .map(::asClientToMessages)
+ .groupMessagesByClients()
+ .flattenValuesToFlux()
+ .toList()
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
+ groupBy({ it.first }, { it.second })
+
+ private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> =
+ mapValues { Flux.concat(it.value) }
+
+ private fun asClientToMessages(parameters: MessageParameters) =
when (parameters) {
is VesEventParameters -> {
- val messages = vesEventGenerator.createMessageFlux(parameters)
- val client = clientFactory.create()
- client.sendVesEvents(messages)
+ val messages = vesEventGenerator
+ .createMessageFlux(parameters)
+ .map(VesEventOuterClass.VesEvent::toByteBuffer)
+ Pair(defaultHvVesClient, messages)
}
is WireFrameParameters -> {
val messages = wireFrameGenerator.createMessageFlux(parameters)
val client = clientFactory.create(parameters.wireFrameVersion)
- client.sendRawPayload(messages)
+ Pair(client, messages)
}
}
+
+ private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable =
+ pair.first
+ .sendRawPayload(pair.second, PayloadType.PROTOBUF)
+ .subscribe()
}
+
+internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer()
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
-import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
*/
class HvVesClient(private val producer: HvVesProducer) {
- fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
- producer.send(messages)
- .then { logger.info { "Ves Events have been sent" } }
+ fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> =
+ producer.sendRaw(messages, payloadType)
+ .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } }
- fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
- producer.sendRaw(messages, PayloadType.UNDEFINED)
- .then { logger.info { "Raw messages have been sent" } }
-
companion object {
private val logger = Logger(HvVesClient::class)
}
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.logging.Logger
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val security = createSecurityConfiguration(cmdLine)
- .doOnFailure { ex ->
- logger.withError {
- log("Could not read security keys", ex)
+ val security = createSecurityConfigurationProvider(cmdLine)
+ .doOnFailure { ex ->
+ logger.withError {
+ log("Could not read security keys", ex)
+ }
}
- }
- .toOption()
- .bind()
+ .toOption()
+ .bind()
SimulatorConfiguration(
InetSocketAddress(listenPort),
* @since February 2019
*/
data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
val healthCheckApiListenAddress: InetSocketAddress,
val hvVesAddress: InetSocketAddress,
val maxPayloadSizeBytes: Int,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since February 2019
*/
-class ClientFactory(configuration: ClientConfiguration) {
+class ClientFactory(private val configuration: ClientConfiguration) {
- private val partialConfig = ImmutableProducerOptions
+ fun create() = hvVesClient(partialConfiguration().build())
+
+ fun create(wireFrameVersion: WireFrameVersion) = hvVesClient(
+ partialConfiguration()
+ .wireFrameVersion(wireFrameVersion)
+ .build())
+
+ private fun partialConfiguration() = ImmutableProducerOptions
.builder()
.collectorAddresses(configuration.collectorAddresses)
- .let { producerOptions ->
- configuration.security.keys.fold(
- { producerOptions },
- { producerOptions.securityKeys(it) })
+ .let { options ->
+ configuration.securityProvider().keys.fold(
+ { options },
+ { options.securityKeys(it) })
}
- fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
- buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
-
-
- fun create(): HvVesClient = buildClient(partialConfig)
+ private fun hvVesClient(producerOptions: ImmutableProducerOptions) =
+ HvVesClient(HvVesProducerFactory.create(producerOptions))
- private fun buildClient(config: ImmutableProducerOptions.Builder) =
- HvVesClient(HvVesProducerFactory.create(config.build()))
}
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
- val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
val xnfSimulator = XnfSimulator(
ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
val hvVesProducer: HvVesProducer = mock()
val cut = HvVesClient(hvVesProducer)
- describe("handling ves events stream") {
-
- val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
- whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
- cut.sendVesEvents(vesEvents)
-
- it("should perform sending operation") {
- verify(hvVesProducer).send(vesEvents)
- }
- }
-
describe("handling raw message stream") {
val rawMessages = Flux.empty<ByteBuffer>()
import arrow.core.None
import arrow.core.Right
import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
whenever(clientFactory.create()).thenReturn(vesClient)
- whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
// when
cut.startSimulation(json).map { it.unsafeRunSync() }
// then
- verify(vesClient).sendVesEvents(generatedMessages)
+ verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))
}
}
})
\ No newline at end of file