Update SDK usage to comply with new API 57/83357/2
authorFilip Krzywka <filip.krzywka@nokia.com>
Tue, 26 Mar 2019 14:01:22 +0000 (15:01 +0100)
committerFilip Krzywka <filip.krzywka@nokia.com>
Wed, 27 Mar 2019 07:07:10 +0000 (08:07 +0100)
Change-Id: Ifa5847617b7cadaadd80a82a3b279ac2beb4b258
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt

index f475a0e..0a1e2d4 100644 (file)
@@ -26,7 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import reactor.core.publisher.Flux
 
 interface Sink {
index c674ef3..fa4f967 100644 (file)
@@ -36,7 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import java.util.concurrent.atomic.AtomicReference
 
 /**
index 723ba39..d2c35cb 100644 (file)
@@ -27,7 +27,7 @@ import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 
 class Router(private val routing: Routing, private val ctx: ClientContext) {
 
index 5b0dca2..f9fd698 100644 (file)
@@ -27,12 +27,14 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
+import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -89,16 +91,15 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
             .retryWhen(retry)
 
 
-    private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
-        try {
-            val sinks = DataStreams.namedSinks(configuration)
-                    .filter { it.type() == "kafka" }
-            return sinks.map(streamParser::unsafeParse).asSequence()
-
-        } catch (e: NullPointerException) {
-            throw ParsingException("Failed to parse configuration", e)
-        }
-    }
+    private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
+            try {
+                DataStreams.namedSinks(configuration)
+                        .filter(streamOfType(KAFKA))
+                        .map(streamParser::unsafeParse)
+                        .asSequence()
+            } catch (e: NullPointerException) {
+                throw ParsingException("Failed to parse configuration", e)
+            }
 
     companion object {
         private const val MAX_RETRIES = 5L
index e71250c..571a668 100644 (file)
@@ -32,9 +32,9 @@ import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
index da9290d..eb3ba26 100644 (file)
@@ -39,8 +39,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import reactor.core.publisher.Flux
 import java.time.Duration
 import java.util.concurrent.atomic.AtomicBoolean
index 213eff2..6599d40 100644 (file)
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.ImmutableKafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
 import reactor.retry.RetryExhaustedException