import java.net.URL;
import org.jetbrains.annotations.NotNull;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
.diagnosticContext(request.diagnosticContext())
.build())
.flatMap(httpClient::call)
+ .doOnNext(HttpResponse::throwIfUnsuccessful)
.map(resp -> resp.bodyAsJson(JsonObject.class))
.doOnNext(this::logCbsResponse);
}
package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
+import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString;
import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType;
-import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA;
-import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER;
import com.google.gson.JsonObject;
import io.vavr.collection.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource;
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
.verify(Duration.ofSeconds(5));
}
+ @Test
+ void testCbsClientWhenTheConfigurationWasNotFound() {
+ // given
+ final EnvProperties unknownAppEnv = ImmutableEnvProperties.copyOf(sampleEnvironment).withAppName("unknown_app");
+ final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(unknownAppEnv);
+ final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create());
+
+ // when
+ final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request));
+
+ // then
+ StepVerifier.create(result)
+ .expectError(HttpException.class)
+ .verify(Duration.ofSeconds(5));
+ }
+
private String sampleConfigValue(JsonObject obj) {
return obj.get(SAMPLE_CONFIG_KEY).getAsString();
}