import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
return VesHvCollector(
wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
protobufDecoder = VesDecoder(),
- messageValidator = MessageValidator(),
router = Router(config.routing),
sink = sinkProvider(config),
metrics = metrics)
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-internal class MessageValidator {
+internal object MessageValidator {
val requiredFieldDescriptors = listOf(
"version",
"startEpochMicrosec",
"lastEpochMicrosec",
"sequence")
- .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
+ .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
fun isValid(message: VesMessage): Boolean {
return allMandatoryFieldsArePresent(message.header)
internal class VesHvCollector(
private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
private val protobufDecoder: VesDecoder,
- private val messageValidator: MessageValidator,
private val router: Router,
private val sink: Sink,
private val metrics: Metrics) : Collector {
.transform { decodeWireFrame(it, wireDecoder) }
.filter(PayloadWireFrameMessage::isValid)
.transform(::decodePayload)
- .filter(messageValidator::isValid)
+ .filter(VesMessage::isValid)
.transform(::routeMessage)
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
.onErrorResume(::handleErrors)
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
+ fun isValid(): Boolean = MessageValidator.isValid(this)
+}
}
given("Message validator") {
- val cut = MessageValidator()
+ val cut = MessageValidator
on("ves hv message including header with fully initialized fields") {
val commonHeader = newBuilder()