Remove Spring stuff from DMaaP 78/79278/6
authorMarcin Migdal <marcin.migdal@nokia.com>
Wed, 27 Feb 2019 16:10:03 +0000 (17:10 +0100)
committerMarcin Migdal <marcin.migdal@nokia.com>
Thu, 28 Feb 2019 14:02:35 +0000 (15:02 +0100)
Change-Id: Id441d0dce89fcb24b5558af7bdf996a74eba78e6
Issue-ID: DCAEGEN2-1245
Signed-off-by: Marcin Migdal <marcin.migdal@nokia.com>
13 files changed:
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java
rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/providers/ReactiveCloudConfigurationProvider.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java
rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/providers/ReactiveCloudConfigurationProviderTest.java
rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java [moved from rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/adapters/CloudHttpClient.java with 71% similarity]
rest-services/dmaap-client/pom.xml
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java
rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactory.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPReactiveWebClientFactoryTest.java

index 7a46317..c2bc823 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 /**
index d11be24..05bfc9b 100644 (file)
@@ -26,7 +26,7 @@ import java.net.URL;
 import org.jetbrains.annotations.NotNull;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 public class CbsClientImpl implements CbsClient {
index f5ec462..9eba275 100644 (file)
@@ -24,7 +24,7 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 /**
index 02e9b9c..4052b4a 100644 (file)
@@ -23,7 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.uri.URI;
 import org.slf4j.Logger;
index 606d00b..9fd7cc8 100644 (file)
@@ -31,7 +31,7 @@ import com.google.gson.JsonObject;
 import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 
 /**
index b46b958..858e938 100644 (file)
@@ -33,7 +33,7 @@ import java.net.InetSocketAddress;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.EnvProperties;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
index 3f720c3..c711226 100644 (file)
@@ -28,7 +28,7 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.ImmutableEnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters.CloudHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.EnvProperties;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
  * ============LICENSE_END=====================================
  */
 
-package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.adapters;
+package org.onap.dcaegen2.services.sdk.rest.services.adapters.http;
 
 import com.google.gson.Gson;
 import io.netty.handler.codec.http.HttpStatusClass;
+import io.netty.handler.ssl.SslContext;
 import io.vavr.collection.Stream;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Mono;
-import reactor.netty.Connection;
 import reactor.netty.http.client.HttpClient;
 import reactor.netty.http.client.HttpClientRequest;
 import reactor.netty.http.client.HttpClientResponse;
@@ -42,7 +44,6 @@ import reactor.netty.http.client.HttpClientResponse;
 public class CloudHttpClient {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(CloudHttpClient.class);
-
     private final Gson gson = new Gson();
     private final HttpClient httpClient;
 
@@ -50,16 +51,25 @@ public class CloudHttpClient {
         this(HttpClient.create());
     }
 
+    public CloudHttpClient(SslContext sslContext) {
+        this(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
+    }
 
-    CloudHttpClient(HttpClient httpClient) {
+    private CloudHttpClient(HttpClient httpClient) {
         this.httpClient = httpClient;
+
     }
 
     public <T> Mono<T> get(String url, RequestDiagnosticContext context, Class<T> bodyClass) {
+        return get(url, context, Collections.EMPTY_MAP, bodyClass);
+    }
+
+    public <T> Mono<T> get(String url, RequestDiagnosticContext context, Map<String, String> customHeaders, Class<T> bodyClass) {
         final HttpClient clientWithHeaders = httpClient
-                .doOnRequest((req, conn) -> logRequest(context, req))
-                .doOnResponse((rsp, conn) -> logResponse(context, rsp))
-                .headers(hdrs -> context.remoteCallHttpHeaders().forEach((BiConsumer<String, String>) hdrs::set));
+            .doOnRequest((req, conn) -> logRequest(context, req))
+            .doOnResponse((rsp, conn) -> logResponse(context, rsp))
+            .headers(hdrs -> context.remoteCallHttpHeaders().forEach((BiConsumer<String, String>) hdrs::set))
+            .headers(hdrs -> customHeaders.forEach(hdrs::set));
         return callHttpGet(clientWithHeaders, url, bodyClass);
     }
 
@@ -69,17 +79,17 @@ public class CloudHttpClient {
 
     private <T> Mono<T> callHttpGet(HttpClient client, String url, Class<T> bodyClass) {
         return client.get()
-                .uri(url)
-                .responseSingle((resp, content) -> HttpStatusClass.SUCCESS.contains(resp.status().code())
-                        ? content.asString()
-                        : Mono.error(createException(url, resp)))
-                .map(body -> parseJson(body, bodyClass));
+            .uri(url)
+            .responseSingle((resp, content) -> HttpStatusClass.SUCCESS.contains(resp.status().code())
+                ? content.asString()
+                : Mono.error(createException(url, resp)))
+            .map(body -> parseJson(body, bodyClass));
     }
 
     private Exception createException(String url, HttpClientResponse response) {
         return new IOException(String.format("Request failed for URL '%s'. Response code: %s",
-                url,
-                response.status()));
+            url,
+            response.status()));
     }
 
     private <T> T parseJson(String body, Class<T> bodyClass) {
@@ -91,8 +101,8 @@ public class CloudHttpClient {
             LOGGER.debug("Request: {} {}", httpClientRequest.method(), httpClientRequest.uri());
             if (LOGGER.isTraceEnabled()) {
                 final String headers = Stream.ofAll(httpClientRequest.requestHeaders())
-                        .map(entry -> entry.getKey() + "=" + entry.getValue())
-                        .collect(Collectors.joining("\n"));
+                    .map(entry -> entry.getKey() + "=" + entry.getValue())
+                    .collect(Collectors.joining("\n"));
                 LOGGER.trace(headers);
             }
         });
index 598913e..2834191 100644 (file)
@@ -68,7 +68,7 @@
     <dependency>
       <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
       <artifactId>aai-client</artifactId>
-      <version>1.1.2-SNAPSHOT</version>
+      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
index d92aef9..99f7020 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.REQUEST_ID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
-
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
-import java.util.function.Consumer;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
-import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
 
@@ -41,6 +38,8 @@ import reactor.core.publisher.Mono;
  */
 public class DMaaPConsumerReactiveHttpClient {
 
+    private final static String SLASH = "/";
+    private final static String CONTENT_TYPE = "Content-Type";
     private final String dmaapHostName;
     private final String dmaapProtocol;
     private final Integer dmaapPortNumber;
@@ -48,15 +47,16 @@ public class DMaaPConsumerReactiveHttpClient {
     private final String consumerGroup;
     private final String consumerId;
     private final String contentType;
-    private final WebClient webClient;
-    private final static String SLASH="/";
+    private final CloudHttpClient cloudHttpClient;
 
     /**
      * Constructor of DMaaPConsumerReactiveHttpClient.
      *
      * @param consumerConfiguration - DMaaP consumer configuration object
      */
-    public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration, WebClient webClient) {
+
+    public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration,
+        CloudHttpClient cloudHttpClient) {
         this.dmaapHostName = consumerConfiguration.dmaapHostName();
         this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
         this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
@@ -64,7 +64,7 @@ public class DMaaPConsumerReactiveHttpClient {
         this.consumerGroup = consumerConfiguration.consumerGroup();
         this.consumerId = consumerConfiguration.consumerId();
         this.contentType = consumerConfiguration.dmaapContentType();
-        this.webClient = webClient;
+        this.cloudHttpClient = cloudHttpClient;
     }
 
     /**
@@ -72,30 +72,15 @@ public class DMaaPConsumerReactiveHttpClient {
      *
      * @return reactive response from DMaaP in string format
      */
-    public Mono<String> getDMaaPConsumerResponse() {
-        return webClient
-            .get()
-            .uri(getUri())
-            .headers(getHeaders())
-            .retrieve()
-            .onStatus(HttpStatus::is4xxClientError, clientResponse ->
-                Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
-            )
-            .onStatus(HttpStatus::is5xxServerError, clientResponse ->
-                Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
-            .bodyToMono(String.class);
-    }
-
-    private Consumer<HttpHeaders> getHeaders() {
-        return httpHeaders -> {
-            httpHeaders.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
-            httpHeaders.set(X_INVOCATION_ID, UUID.randomUUID().toString());
-            httpHeaders.set(HttpHeaders.CONTENT_TYPE, contentType);
-        };
-    }
-
-    private String createRequestPath() {
-        return  new StringBuilder().append(SLASH).append(dmaapTopicName).append(SLASH).append(consumerGroup).append(SLASH).append(consumerId).toString();
+    public Mono<String> getDMaaPConsumerResponse(Optional<RequestDiagnosticContext> requestDiagnosticContextOptional) {
+        Map<String,String> headers = new HashMap<>();
+        headers.put(CONTENT_TYPE,contentType);
+        if (requestDiagnosticContextOptional.isPresent()) {
+            return cloudHttpClient.get(getUri().toString(), requestDiagnosticContextOptional.get(),headers, String.class);
+        }
+        RequestDiagnosticContext requestDiagnosticContext = ImmutableRequestDiagnosticContext.builder()
+            .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build();
+        return cloudHttpClient.get(getUri().toString(), requestDiagnosticContext, headers, String.class);
     }
 
     URI getUri() {
@@ -103,4 +88,9 @@ public class DMaaPConsumerReactiveHttpClient {
             new URIBuilder().scheme(dmaapProtocol).host(dmaapHostName).port(dmaapPortNumber).path(createRequestPath())
                 .build().toString());
     }
+
+    private String createRequestPath() {
+        return new StringBuilder().append(SLASH).append(dmaapTopicName).append(SLASH).append(consumerGroup)
+            .append(SLASH).append(consumerId).toString();
+    }
 }
index b1f2ab0..fba6f18 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
 
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
-import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.SERVICE_NAME;
-
 import io.netty.handler.ssl.SslContext;
 import javax.net.ssl.SSLException;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-import org.springframework.http.client.reactive.ClientHttpConnector;
-import org.springframework.http.client.reactive.ReactorClientHttpConnector;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
-import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-import reactor.netty.http.client.HttpClient;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
  */
 public class DMaaPReactiveWebClientFactory {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
     private final SslFactory sslFactory;
 
     public DMaaPReactiveWebClientFactory() {
@@ -55,49 +42,23 @@ public class DMaaPReactiveWebClientFactory {
     }
 
     /**
-     * Construct Reactive WebClient with appropriate settings.
+     * Construct CloudHttpClient with appropriate settings.
      *
-     * @return WebClient
+     * @return CloudHttpClient
      */
-    public WebClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
+
+    public CloudHttpClient build(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
         SslContext sslContext = createSslContext(consumerConfiguration);
-        ClientHttpConnector reactorClientHttpConnector = new ReactorClientHttpConnector(
-                HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext)));
-        return WebClient.builder()
-                .clientConnector(reactorClientHttpConnector)
-                .filter(logRequest())
-                .filter(logResponse())
-                .build();
+        return new CloudHttpClient(sslContext);
     }
 
     private SslContext createSslContext(DmaapConsumerConfiguration consumerConfiguration) throws SSLException {
         if (consumerConfiguration.enableDmaapCertAuth()) {
             return sslFactory.createSecureContext(
-                    consumerConfiguration.keyStorePath(), consumerConfiguration.keyStorePasswordPath(),
-                    consumerConfiguration.trustStorePath(), consumerConfiguration.trustStorePasswordPath()
+                consumerConfiguration.keyStorePath(), consumerConfiguration.keyStorePasswordPath(),
+                consumerConfiguration.trustStorePath(), consumerConfiguration.trustStorePasswordPath()
             );
         }
         return sslFactory.createInsecureContext();
     }
-
-    private ExchangeFilterFunction logResponse() {
-        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
-            MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
-            logger.info("Response Status {}", clientResponse.statusCode());
-            MDC.remove(RESPONSE_CODE);
-            return Mono.just(clientResponse);
-        });
-    }
-
-    private ExchangeFilterFunction logRequest() {
-        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
-            MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
-            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
-            clientRequest.headers()
-                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
-            MDC.remove(SERVICE_NAME);
-            return Mono.just(clientRequest);
-        });
-    }
-
 }
index 9a4a130..4c78901 100644 (file)
 
 package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.http.entity.ContentType;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -47,11 +45,11 @@ class DMaaPConsumerReactiveHttpClientTest {
     private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
     private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
     private DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
-    private Mono<String> expectedResult = Mono.empty();
-    private WebClient webClient;
-    private RequestHeadersUriSpec requestHeadersSpec;
-    private ResponseSpec responseSpec;
-
+    private Mono<String> expectedResult;
+    private CloudHttpClient httpClient = mock(CloudHttpClient.class);
+    private URI exampleTestUri = URI
+        .create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12");
+    private RequestDiagnosticContext requestDiagnosticContext = mock(RequestDiagnosticContext.class);
 
     @BeforeEach
     void setUp() {
@@ -60,32 +58,22 @@ class DMaaPConsumerReactiveHttpClientTest {
         when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
         when(consumerConfigurationMock.dmaapUserName()).thenReturn("PRH");
         when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
-        when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
+        when(consumerConfigurationMock.dmaapContentType()).thenReturn(ContentType.APPLICATION_JSON.getMimeType());
         when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
         when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
         when(consumerConfigurationMock.consumerId()).thenReturn("c12");
-
-        webClient = spy(WebClient.builder()
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
-            .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
-                consumerConfigurationMock.dmaapUserPassword()))
-            .build());
-        dmaapConsumerReactiveHttpClient = new DMaaPConsumerReactiveHttpClient(consumerConfigurationMock, webClient);
-        requestHeadersSpec = mock(RequestHeadersUriSpec.class);
-        responseSpec = mock(ResponseSpec.class);
+        dmaapConsumerReactiveHttpClient = new DMaaPConsumerReactiveHttpClient(consumerConfigurationMock, httpClient);
     }
 
-
     @Test
     void getHttpResponse_Success() {
         //given
         expectedResult = Mono.just(JSON_MESSAGE);
-
+        when(httpClient.get(exampleTestUri.toString(), requestDiagnosticContext, getCustomHeaders(), String.class))
+            .thenReturn(expectedResult);
         //when
-        mockDependantObjects();
-        doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
-        Mono<String> response = dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse();
-
+        Mono<String> response = dmaapConsumerReactiveHttpClient
+            .getDMaaPConsumerResponse(Optional.of(requestDiagnosticContext));
         //then
         StepVerifier.create(response).expectSubscription()
             .expectNextMatches(results -> {
@@ -96,16 +84,13 @@ class DMaaPConsumerReactiveHttpClientTest {
 
     @Test
     void getAppropriateUri_whenPassingCorrectedPathForPnf() {
-        Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
-            URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"));
+        Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(), exampleTestUri);
     }
 
-    private void mockDependantObjects() {
-        when(webClient.get()).thenReturn(requestHeadersSpec);
-        when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
-        when(requestHeadersSpec.headers(any())).thenReturn(requestHeadersSpec);
-        when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
-        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+    private Map<String, String> getCustomHeaders() {
+        Map<String, String> customHeaders = new HashMap<>();
+        customHeaders.put("Content-Type", ContentType.APPLICATION_JSON.getMimeType());
+        return customHeaders;
     }
 
 }
\ No newline at end of file
index d9989d1..6fd2200 100644 (file)
@@ -30,7 +30,8 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient;
+
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/5/18
@@ -51,7 +52,7 @@ class DMaaPReactiveWebClientFactoryTest {
         DmaapConsumerConfiguration dmaapConsumerConfiguration = givenDmaapConfigurationWithSslDisabled();
 
         //when
-        WebClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
+        CloudHttpClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
 
         //then
         Assertions.assertNotNull(dmaapReactiveWebClient);
@@ -64,7 +65,7 @@ class DMaaPReactiveWebClientFactoryTest {
         DmaapConsumerConfiguration dmaapConsumerConfiguration = givenDmaapConfigurationWithSslEnabled();
 
         //when
-        WebClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
+        CloudHttpClient dmaapReactiveWebClient = webClientFactory.build(dmaapConsumerConfiguration);
 
         //then
         Assertions.assertNotNull(dmaapReactiveWebClient);