import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream;
+ " \"ServicePort\": PORT\n"
+ " }\n"
+ "]\n";
- private static final String SAMPLE_CONFIG = "/sample_config.json";
+ private static final String SAMPLE_CONFIG = "/sample_service_config.json";
+ private static final String SAMPLE_ALL = "/sample_all.json";
+ private static final String SAMPLE_KEY = "/sample_key.json";
private static final String SAMPLE_CONFIG_KEY = "keystore.path";
private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12";
private static EnvProperties sampleEnvironment;
static void setUp() {
server = DummyHttpServer.start(routes ->
routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse()))
- .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)));
+ .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))
+ .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL))
+ .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY))
+ );
sampleEnvironment = ImmutableEnvProperties.builder()
.appName("dcae-component")
.cbsName("the_cbs")
void testCbsClientWithSingleCall() {
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
// when
- final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext));
+ final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
// then
StepVerifier.create(result.map(this::sampleConfigValue))
void testCbsClientWithPeriodicCall() {
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
// when
final Flux<JsonObject> result = sut
- .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10)));
+ .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10)));
// then
final int itemsToTake = 5;
void testCbsClientWithUpdatesCall() {
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
final Duration period = Duration.ofMillis(10);
// when
final Flux<JsonObject> result = sut
- .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period));
+ .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period));
// then
final Duration timeToCollectItemsFor = period.multipliedBy(50);
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
// when
- final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json ->
DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head()
);
void testCbsClientWithStreamsParsingUsingSwitch() {
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
// TODO: Use these parsers below
final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser();
final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser();
// when
- final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json -> {
final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json)
.groupBy(RawDataStream::type);
// given
final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser();
- final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create();
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
// when
- final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext))
+ final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request))
.map(json ->
DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head()
);
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWithSingleAllRequest() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create());
+
+ // when
+ final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+ // then
+ StepVerifier.create(result)
+ .assertNext(json -> {
+ assertThat(json.get("config")).isNotNull();
+ assertThat(json.get("policies")).isNotNull();
+ assertThat(json.get("sampleKey")).isNotNull();
+ })
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
+
+ @Test
+ void testCbsClientWithSingleKeyRequest() {
+ // given
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment);
+ final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey");
+
+ // when
+ final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+ // then
+ StepVerifier.create(result)
+ .assertNext(json -> {
+ assertThat(json.get("key")).isNotNull();
+ assertThat(json.get("key").getAsString()).isEqualTo("value");
+ })
+ .expectComplete()
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}