class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
+ private val maximumPayloadSizeBytes: Int,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+ wireChunkDecoderSupplier = { alloc ->
+ WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
+ },
protobufDecoder = VesDecoder(),
router = Router(config.routing),
sink = sinkProvider(config),
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val healthCheckApiPort: Int,
+ val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)
fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
WireFrameEncoder(alloc).let { encoder ->
- MessageGenerator.INSTANCE
+ MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
.createMessageFlux(listOf(params))
.map(encoder::encode)
.transform { simulateRemoteTcp(alloc, 1000, it) }
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
+ private val collectorFactory = CollectorFactory(
+ configurationProvider,
+ SinkProvider.just(sink),
+ metrics,
+ MAX_PAYLOAD_SIZE_BYTES,
+ healthStateProvider)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+
+ companion object {
+ const val MAX_PAYLOAD_SIZE_BYTES = 1024
+ }
+
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+ val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
val handledMessages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP, "first"),
- vesMessageWithTooBigPayload(PERF3GPP),
+ vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
* @since August 2018
*/
class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+ private val messageStreamValidation: MessageStreamValidation) {
private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
import javax.json.Json
class MessageStreamValidation(
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
KAFKA_SERVERS,
KAFKA_TOPICS
)
val listenPort = cmdLine
.intValue(LISTEN_PORT)
.bind()
+ val maxPayloadSizeBytes = cmdLine
+ .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
val kafkaBootstrapServers = cmdLine
.stringValue(KAFKA_SERVERS)
.bind()
DcaeAppSimConfiguration(
listenPort,
+ maxPayloadSizeBytes,
kafkaBootstrapServers,
kafkaTopics)
}.fix()
data class DcaeAppSimConfiguration(
val apiPort: Int,
+ val maxPayloadSizeBytes: Int,
val kafkaBootstrapServers: String,
val kafkaTopics: Set<String>
)
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+ logger.info("Using configuration: $config")
+ val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+ val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiPort, config.kafkaTopics)
.unit()
}
beforeEachTest {
messageParametersParser = mock()
messageGenerator = mock()
- cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+ cut = MessageStreamValidation(messageGenerator, messageParametersParser)
}
fun givenParsedMessageParameters(vararg params: MessageParameters) {
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameDecoder {
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
- byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
+ byteBuf.skipBytes(RESERVED_BYTE_COUNT)
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
- if (payloadSize > MAX_PAYLOAD_SIZE) {
+ if (payloadSize > maxPayloadSizeBytes) {
byteBuf.resetReaderIndex()
- return Left(PayloadSizeExceeded)
+ return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
}
if (byteBuf.readableBytes() < payloadSize) {
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
.format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
-object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
+class PayloadSizeExceeded(maxPayloadSizeBytes: Int) :
+ InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)")
// Missing bytes errors
RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes
1 * java.lang.Integer.BYTES // payload length
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
+ const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
}
}
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
*/
object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
+ val maxPayloadSizeBytes = 1024
val encoder = WireFrameEncoder()
- val decoder = WireFrameDecoder()
+ val decoder = WireFrameDecoder(maxPayloadSizeBytes)
fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
it("should decode successfully when payload size is equal 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
it("should return error when payload exceeds 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+ val payload = ByteArray(maxPayloadSizeBytes + 1)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
it("should validate only first message") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
TRUST_STORE_FILE,
TRUST_STORE_PASSWORD,
IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
DUMMY_MODE
)
)
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine).bind()
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
dummyMode = dummyMode)
}.fix()
const val CONSUL_FIRST_REQUEST_DELAY = 10L
const val CONSUL_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
+ const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
}
}
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
+ logger.info("Using configuration: $config")
HealthCheckServer.start(config).bind()
VesServer.start(config).bind()
.await().bind()
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
sink,
- MicrometerMetrics()
+ MicrometerMetrics(),
+ config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
return ServerFactory.createNettyTcpServer(config, collectorProvider)
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
writeByte(0x01) // version minor
}
-fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
val gpb = vesEvent(
domain = domain,
- eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
).toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
|connection might be closed.""".trimMargin())
.build()
),
+ MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m")
+ .longOpt("max-payload-size")
+ .hasArg()
+ .desc("Maximum supported payload size in bytes")
+ .build()
+ ),
DUMMY_MODE(Option.builder("u")
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
package org.onap.dcae.collectors.veshv.ves.message.generator.api
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import reactor.core.publisher.Flux
/**
fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
companion object {
- val INSTANCE: MessageGenerator by lazy {
- MessageGeneratorImpl(PayloadGenerator())
- }
-
const val FIXED_PAYLOAD_SIZE = 100
}
}
--- /dev/null
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.factory
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since October 2018
+ */
+object MessageGeneratorFactory {
+ fun create(maxPayloadSizeBytes: Int): MessageGenerator =
+ MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+}
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
+class MessageGeneratorImpl internal constructor(
+ private val payloadGenerator: PayloadGenerator,
+ private val maxPayloadSizeBytes: Int
+) : MessageGenerator {
override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
.fromIterable(messageParameters)
.build()
private fun oversizedPayload() =
- payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+ payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
private fun fixedPayload() =
payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
*/
object MessageGeneratorImplTest : Spek({
describe("message factory") {
- val generator = MessageGenerator.INSTANCE
+ val maxPayloadSizeBytes = 1024
+ val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
given("single message parameters") {
on("messages amount not specified in parameters") {
it("should create infinite flux") {
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.verifyComplete()
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.verifyComplete()
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
.isThrownBy { extractCommonEventHeader(it.payload) }
}
.test()
.assertNext {
assertThat(it.isValid()).isFalse()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
}
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
generator.createMessageFlux(messageParameters)
.test()
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
}
.expectNextCount(singleFluxSize - 1)
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
private const val EXPECTED_MESSAGES_AMOUNT = 25000L
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import javax.json.Json
*/
class XnfSimulator(
private val vesClient: VesHvClient,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
import arrow.core.Either
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.utils.http.Content
-import org.onap.dcae.collectors.veshv.utils.http.ContentType
import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
import java.util.*
-import javax.json.Json
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
VES_HV_PORT,
VES_HV_HOST,
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
SSL_DISABLE,
KEY_STORE_FILE,
KEY_STORE_PASSWORD,
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
SimulatorConfiguration(
listenPort,
vesHost,
vesPort,
+ maxPayloadSizeBytes,
createSecurityConfiguration(cmdLine).bind())
}.fix()
+
}
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
+ val maxPayloadSizeBytes: Int,
val security: SecurityConfiguration)
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
private val logger = Logger(PACKAGE_NAME)
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
.map { config ->
- XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
+ logger.info("Using configuration: $config")
+ val xnfSimulator = XnfSimulator(
+ VesHvClient(config),
+ MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ XnfApiServer(xnfSimulator, OngoingSimulations())
.start(config.listenPort)
.unit()
}
vesClient = mock()
messageParametersParser = mock()
messageGenerator = mock()
- cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser)
}
describe("startSimulation") {