1 package org.onap.dcae.collectors.veshv.main.impl
3 import com.google.protobuf.ByteString
4 import io.netty.buffer.ByteBuf
5 import io.netty.buffer.Unpooled
6 import org.onap.dcae.collectors.veshv.domain.WireFrame
7 import org.onap.ves.VesEventV5
8 import reactor.core.publisher.Flux
9 import reactor.core.publisher.Mono
12 * @author Jakub Dudycz <jakub.dudycz@nokia.com>
15 class MessageFactory {
18 const val DEFAULT_START_EPOCH: Long = 120034455
19 const val DEFAULT_LAST_EPOCH: Long = 120034455
22 fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
23 Mono.just(createMessage()).repeat(amount.toLong())
26 private fun createMessage(): WireFrame {
27 val commonHeader = VesEventV5.VesEvent.CommonEventHeader.newBuilder()
29 .setEventName("Sample event name")
30 .setDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
31 .setEventId("Sample event Id")
32 .setSourceName("Sample Source")
33 .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
34 .setPriority(VesEventV5.VesEvent.CommonEventHeader.Priority.MEDIUM)
35 .setStartEpochMicrosec(DEFAULT_START_EPOCH)
36 .setLastEpochMicrosec(DEFAULT_LAST_EPOCH)
40 val payload = vesMessageBytes(commonHeader)
46 payloadSize = payload.readableBytes())
51 private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
52 val msg = VesEventV5.VesEvent.newBuilder()
53 .setCommonEventHeader(commonHeader)
54 .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
57 return Unpooled.wrappedBuffer(msg.toByteArray())