import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
import reactor.core.publisher.Flux
interface Sink {
internal object MessageValidator {
- val requiredFieldDescriptors = listOf(
+ private val requiredFieldDescriptors = listOf(
"version",
"eventName",
// "domain", TODO to be restored back when GPB schema will include default value
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.client.HttpClientException
-import reactor.retry.Retry
-import reactor.retry.retryExponentialBackoff
import java.io.StringReader
import java.time.Duration
import java.util.*
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import javax.json.Json
import javax.json.JsonObject
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import com.nhaarman.mockito_kotlin.eq
import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.mockito.Mockito
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.client.HttpClient
import reactor.test.StepVerifier
import java.time.Duration
import java.util.*
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-xnf-simulator</artifactId>
+ <artifactId>hv-collector-dcae-app-simulator</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-dcae-app-simulator</artifactId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-xnf-simulator</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
package org.onap.dcae.collectors.veshv.tests.component
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
- val alloc = UnpooledByteBufAllocator.DEFAULT
- val metrics = FakeMetrics()
+ val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
+ private val metrics = FakeMetrics()
private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
- val collectorProvider = collectorFactory.createVesHvCollectorProvider()
+ private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
get() = collectorProvider()
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.ves.VesEventV5
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
fun vesEvent(domain: Domain = Domain.HVRANMEAS,
id: String = UUID.randomUUID().toString(),
- hvRanMeasFields: ByteString = ByteString.EMPTY) =
+ hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
}
}
-class ConsumerFactory(val kafkaBootstrapServers: String) {
+class ConsumerFactory(private val kafkaBootstrapServers: String) {
fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
}
chain
.put("configuration/topics") { ctx ->
ctx.request.body.then { it ->
- val topics = extractTopics(it.getText())
+ val topics = extractTopics(it.text)
logger.info("Received new configuration. Creating consumer for topics: $topics")
consumerState = consumerFactory.createConsumerForTopics(topics)
ctx.response.contentType(CONTENT_TEXT)
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
-import arrow.core.Failure
-import arrow.core.Success
import arrow.core.identity
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
}
- it("should set proper kafka boostrap servers") {
+ it("should set proper kafka bootstrap servers") {
assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
}
assertThat(result.apiPort).isEqualTo(666)
}
- it("should set proper kafka boostrap servers") {
+ it("should set proper kafka bootstrap servers") {
assertThat(result.kafkaBootstrapServers).isEqualTo(kafkaBootstrapServers)
}
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameEncoder(val allocator: ByteBufAllocator) {
+class WireFrameEncoder(private val allocator: ByteBufAllocator) {
fun encode(frame: PayloadWireFrameMessage): ByteBuf {
val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.core.Failure
-import arrow.core.Success
import arrow.core.identity
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import arrow.core.Option
import arrow.core.Try
import arrow.core.getOrElse
-import arrow.core.recoverWith
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.Options
on("call with specified parameters") {
val numOfCountPerMeas: Long = 5
- val numOfMeasPerObject: Int = 10
+ val numOfMeasPerObject = 10
val generatedPayload = payloadGenerator.generatePayload(numOfCountPerMeas, numOfMeasPerObject)
it("should contain specified number of measurements") {
assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(numOfMeasPerObject)
<modules>
<module>hv-collector-analysis</module>
- <module>hv-collector-xnf-simulator</module>
<module>hv-collector-core</module>
<module>hv-collector-coverage</module>
<module>hv-collector-ct</module>
<module>hv-collector-main</module>
<module>hv-collector-utils</module>
<module>hv-collector-ves-message-generator</module>
+ <module>hv-collector-xnf-simulator</module>
</modules>
<properties>