Added reactive DMaaPClient 41/59341/1
authorwasala <przemyslaw.wasala@nokia.com>
Wed, 4 Jul 2018 12:12:23 +0000 (14:12 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Tue, 7 Aug 2018 07:45:22 +0000 (09:45 +0200)
Extracted WebCLientBuilder for
Producer and Consumer.
Added unit test for ReactiveProducerClient.

Change-Id: I632e6928813ed9feb48982900c173f741e4483e3
Issue-ID: DCAEGEN2-563
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
17 files changed:
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapProducerTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java [new file with mode: 0644]
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapHttpClientImpl.java [deleted file]
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java [moved from prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java with 63% similarity]
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java [new file with mode: 0644]
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImpl.java [deleted file]
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java [moved from prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java with 82% similarity]
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java [new file with mode: 0644]
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImplTest.java [deleted file]

index 5cd30f8..6db36a8 100644 (file)
@@ -22,7 +22,7 @@ package org.onap.dcaegen2.services.prh.tasks;
 import java.util.Optional;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import reactor.core.publisher.Mono;
 
 /**
@@ -32,7 +32,7 @@ abstract class DmaapConsumerTask {
 
     abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
 
-    abstract DmaapConsumerReactiveHttpClient resolveClient();
+    abstract DMaaPConsumerReactiveHttpClient resolveClient();
 
     abstract void initConfigs();
 
index 08008f0..90382e5 100644 (file)
@@ -23,11 +23,10 @@ import java.util.Optional;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,7 +42,7 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
-    private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+    private DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
 
     @Autowired
     public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
@@ -58,16 +57,15 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     @Override
     Mono<ConsumerDmaapModel> consume(Mono<String> message) {
-        logger.info("Consumed model from DmaaP: {}", message);
+        logger.info("Consumed model from DMaaP: {}", message);
         return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
     @Override
     public Mono<ConsumerDmaapModel> execute(String object) {
-        dmaapConsumerReactiveHttpClient = resolveClient();
-        dmaapConsumerReactiveHttpClient.initWebClient();
+        dMaaPConsumerReactiveHttpClient = resolveClient();
         logger.trace("Method called with arg {}", object);
-        return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
+        return consume((dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()));
     }
 
     @Override
@@ -80,8 +78,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     @Override
-    DmaapConsumerReactiveHttpClient resolveClient() {
-        return Optional.ofNullable(dmaapConsumerReactiveHttpClient)
-            .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration()));
+    DMaaPConsumerReactiveHttpClient resolveClient() {
+
+        return Optional.ofNullable(dMaaPConsumerReactiveHttpClient)
+            .orElseGet(() -> {
+                DmaapConsumerConfiguration dmaapConsumerConfiguration = resolveConfiguration();
+                return new DMaaPConsumerReactiveHttpClient(dmaapConsumerConfiguration).createDMaaPWebClient(
+                    new DMaaPReactiveWebClient.WebClientBuilder()
+                        .dmaapContentType(dmaapConsumerConfiguration.dmaapContentType())
+                        .dmaapUserName(dmaapConsumerConfiguration.dmaapUserName())
+                        .dmaapUserPassword(dmaapConsumerConfiguration.dmaapUserPassword()).build());
+            });
     }
 }
index 3520d13..27670e2 100644 (file)
@@ -21,16 +21,17 @@ package org.onap.dcaegen2.services.prh.tasks;
 
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  */
 abstract class DmaapPublisherTask {
 
-    abstract Integer publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+    abstract Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
 
-    abstract ExtendedDmaapProducerHttpClientImpl resolveClient();
+    abstract DMaaPProducerReactiveHttpClient resolveClient();
 
-    protected abstract Integer execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
+    protected abstract Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
 }
index 7cbeb3b..faf43bc 100644 (file)
@@ -25,12 +25,13 @@ import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -40,7 +41,7 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
-    private ExtendedDmaapProducerHttpClientImpl extendedDmaapProducerHttpClient;
+    private DMaaPProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
 
     @Autowired
     public DmaapPublisherTaskImpl(AppConfig prhAppConfig) {
@@ -48,19 +49,17 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    Integer publish(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
-        logger.info("Publishing on DmaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
+    Mono<Integer> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
+        logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
             consumerDmaapModel);
-        return extendedDmaapProducerHttpClient.getHttpProducerResponse(consumerDmaapModel)
-            .filter(response -> response == HttpStatus.OK.value())
-            .orElseThrow(() -> new DmaapNotFoundException("Incorrect response from Dmaap"));
+        return dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel).map(Integer::parseInt);
     }
 
     @Override
-    public Integer execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
+    public Mono<Integer> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
         consumerDmaapModel = Optional.ofNullable(consumerDmaapModel)
-            .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to Dmaap task"));
-        extendedDmaapProducerHttpClient = resolveClient();
+            .orElseThrow(() -> new DmaapNotFoundException("Invoked null object to DMaaP task"));
+        dMaaPProducerReactiveHttpClient = resolveClient();
         logger.trace("Method called with arg {}", consumerDmaapModel);
         return publish(consumerDmaapModel);
     }
@@ -70,8 +69,15 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    ExtendedDmaapProducerHttpClientImpl resolveClient() {
-        return Optional.ofNullable(extendedDmaapProducerHttpClient)
-            .orElseGet(() -> new ExtendedDmaapProducerHttpClientImpl(resolveConfiguration()));
+    DMaaPProducerReactiveHttpClient resolveClient() {
+        return Optional.ofNullable(dMaaPProducerReactiveHttpClient)
+            .orElseGet(() -> {
+                DmaapPublisherConfiguration dmaapPublisherConfiguration = resolveConfiguration();
+                return new DMaaPProducerReactiveHttpClient(dmaapPublisherConfiguration).createDMaaPWebClient(
+                    new DMaaPReactiveWebClient.WebClientBuilder()
+                        .dmaapContentType(dmaapPublisherConfiguration.dmaapContentType())
+                        .dmaapUserName(dmaapPublisherConfiguration.dmaapUserName())
+                        .dmaapUserPassword(dmaapPublisherConfiguration.dmaapUserPassword()).build());
+            });
     }
 }
\ No newline at end of file
index e161e3c..2787e64 100644 (file)
@@ -97,13 +97,11 @@ public class ScheduledTasks {
     }
 
     private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
-        return monoAAIModel.flatMap(aaiModel -> {
-            try {
-                return Mono.just(dmaapProducerTask.execute(aaiModel));
-            } catch (PrhTaskException e) {
-                logger.warn("Exception in DMaaPProducer task ", e);
-                return Mono.error(e);
-            }
-        });
+        try {
+            return dmaapProducerTask.execute(monoAAIModel);
+        } catch (PrhTaskException e) {
+            logger.warn("Exception in DMaaPProducer task ", e);
+            return Mono.error(e);
+        }
     }
 }
index 71e132c..c5a9d85 100644 (file)
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.gson.JsonElement;
@@ -32,7 +31,6 @@ import com.google.gson.JsonParser;
 import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
@@ -43,7 +41,7 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -54,7 +52,7 @@ class DmaapConsumerTaskImplTest {
 
     private static ConsumerDmaapModel consumerDmaapModel;
     private static DmaapConsumerTaskImpl dmaapConsumerTask;
-    private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+    private static DMaaPConsumerReactiveHttpClient dMaaPConsumerReactiveHttpClient;
     private static AppConfig appConfig;
     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
     private static String message;
@@ -102,22 +100,21 @@ class DmaapConsumerTaskImplTest {
         StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
             .expectError(DmaapEmptyResponseException.class);
 
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
+        verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
     }
 
     @Test
     public void whenPassedObjectFits_ReturnsCorrectResponse() throws PrhTaskException {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
-
         //when
         Mono<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
 
         //then
-        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
-        Assertions.assertNotNull(response);
+        verify(dMaaPConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
         Assertions.assertEquals(consumerDmaapModel, response.block());
 
+
     }
 
     private void prepareMocksForDmaapConsumer(Optional<String> message) {
@@ -125,11 +122,11 @@ class DmaapConsumerTaskImplTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
-        when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
+        dMaaPConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
+        when(dMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
         dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerJsonParser));
         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+        doReturn(dMaaPConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
     }
 }
\ No newline at end of file
index 225dd3f..97c75ce 100644 (file)
@@ -25,7 +25,7 @@ import static org.mockito.Mockito.spy;
 
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
@@ -42,10 +42,10 @@ public class DmaapConsumerTaskSpy {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
         DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
-        DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock(
-            DmaapConsumerReactiveHttpClient.class);
+        DMaaPConsumerReactiveHttpClient DMaaPConsumerReactiveHttpClient = mock(
+            DMaaPConsumerReactiveHttpClient.class);
         doReturn(mock(DmaapConsumerConfiguration.class)).when(dmaapConsumerTask).resolveConfiguration();
-        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
+        doReturn(DMaaPConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
         return dmaapConsumerTask;
     }
 }
index 0105660..ceb0ed8 100644 (file)
@@ -22,11 +22,10 @@ package org.onap.dcaegen2.services.prh.tasks;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 
 import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
@@ -43,8 +42,8 @@ public class DmaapProducerTaskSpy {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(DmaapPublisherConfiguration.class)).when(appConfig).getDmaapPublisherConfiguration();
         DmaapPublisherTaskImpl dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
-        ExtendedDmaapProducerHttpClientImpl extendedDmaapProducerHttpClient = mock(
-            ExtendedDmaapProducerHttpClientImpl.class);
+        DMaaPProducerReactiveHttpClient extendedDmaapProducerHttpClient = mock(
+            DMaaPProducerReactiveHttpClient.class);
         doReturn(mock(DmaapPublisherConfiguration.class)).when(dmaapPublisherTask).resolveConfiguration();
         doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient();
         return dmaapPublisherTask;
index 13534ce..6b08be5 100644 (file)
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -36,11 +35,14 @@ import org.junit.jupiter.api.function.Executable;
 import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.config.ImmutableDmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.producer.ExtendedDmaapProducerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
 import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
@@ -49,7 +51,7 @@ class DmaapPublisherTaskImplTest {
 
     private static ConsumerDmaapModel consumerDmaapModel;
     private static DmaapPublisherTaskImpl dmaapPublisherTask;
-    private static ExtendedDmaapProducerHttpClientImpl extendedDmaapProducerHttpClient;
+    private static DMaaPProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
     private static AppConfig appConfig;
     private static DmaapPublisherConfiguration dmaapPublisherConfiguration;
 
@@ -85,38 +87,36 @@ class DmaapPublisherTaskImplTest {
         prepareMocksForTests(HttpStatus.OK.value());
 
         //when
-        Integer response = dmaapPublisherTask.execute(consumerDmaapModel);
+        StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
+            .expectNext(HttpStatus.OK.value());
 
         //then
-        verify(extendedDmaapProducerHttpClient, times(1))
-            .getHttpProducerResponse(any(ConsumerDmaapModel.class));
-        verifyNoMoreInteractions(extendedDmaapProducerHttpClient);
-        Assertions.assertEquals((Integer) HttpStatus.OK.value(), response);
+        verify(dMaaPProducerReactiveHttpClient, times(1))
+            .getDMaaPProducerResponse(any(Mono.class));
+        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
     @Test
-    public void whenPassedObjectFits_butIncorrectResponseReturns() {
+    public void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException {
         //given
         prepareMocksForTests(HttpStatus.UNAUTHORIZED.value());
 
         //when
-        Executable executableFunction = () -> dmaapPublisherTask.execute(consumerDmaapModel);
+        StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
+            .expectError(PrhTaskException.class);
 
         //then
-        Assertions
-            .assertThrows(PrhTaskException.class, executableFunction, "Incorrect response from DMAAP");
-        verify(extendedDmaapProducerHttpClient, times(1)).getHttpProducerResponse(any(ConsumerDmaapModel.class));
-        verifyNoMoreInteractions(extendedDmaapProducerHttpClient);
+        verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any(Mono.class));
+        verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
 
     private void prepareMocksForTests(Integer httpResponseCode) {
-        extendedDmaapProducerHttpClient = mock(ExtendedDmaapProducerHttpClientImpl.class);
-        when(extendedDmaapProducerHttpClient.getHttpProducerResponse(consumerDmaapModel))
-            .thenReturn(Optional.of(httpResponseCode));
-        when(appConfig.getDmaapPublisherConfiguration()).thenReturn(dmaapPublisherConfiguration);
+        dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
+        when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any(Mono.class)))
+            .thenReturn(Mono.just(httpResponseCode));
         dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
         when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
-        doReturn(extendedDmaapProducerHttpClient).when(dmaapPublisherTask).resolveClient();
+        doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
     }
 }
\ No newline at end of file
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
new file mode 100644 (file)
index 0000000..a41ec3a
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service;
+
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DMaaPReactiveWebClient {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private DMaaPReactiveWebClient() {
+    }
+
+    private WebClient create(WebClientBuilder webClientBuilder) {
+        return WebClient.builder()
+            .defaultHeader(HttpHeaders.CONTENT_TYPE, webClientBuilder.dMaaPContentType)
+            .filter(basicAuthentication(webClientBuilder.dMaaPUserName, webClientBuilder.dMaaPUserPassword))
+            .filter(logRequest())
+            .filter(logResponse())
+            .build();
+    }
+
+    ExchangeFilterFunction logResponse() {
+        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+            logger.info("Response Status {}", clientResponse.statusCode());
+            return Mono.just(clientResponse);
+        });
+    }
+
+    ExchangeFilterFunction logRequest() {
+        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+            clientRequest.headers()
+                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+            return Mono.just(clientRequest);
+        });
+    }
+
+    public static class WebClientBuilder {
+
+        private String dMaaPContentType;
+        private String dMaaPUserName;
+        private String dMaaPUserPassword;
+
+        public WebClientBuilder() {
+        }
+
+        public WebClientBuilder dmaapContentType(String dmaapContentType) {
+            this.dMaaPContentType = dmaapContentType;
+            return this;
+        }
+
+        public WebClientBuilder dmaapUserName(String dmaapUserName) {
+            this.dMaaPUserName = dmaapUserName;
+            return this;
+        }
+
+        public WebClientBuilder dmaapUserPassword(String dmaapUserPassword) {
+            this.dMaaPUserPassword = dmaapUserPassword;
+            return this;
+        }
+
+        public WebClient build() {
+            return new DMaaPReactiveWebClient().create(this);
+        }
+    }
+}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapHttpClientImpl.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapHttpClientImpl.java
deleted file mode 100644 (file)
index 309106c..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service;
-
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.Credentials;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
-import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DmaapHttpClientImpl {
-
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private final String dmaapHostName;
-    private final Integer dmaapPortNumber;
-    private final String dmaapUserName;
-    private final String dmaapUserPassword;
-
-
-    public DmaapHttpClientImpl(DmaapCustomConfig configuration) {
-        this.dmaapHostName = configuration.dmaapHostName();
-        this.dmaapPortNumber = configuration.dmaapPortNumber();
-        this.dmaapUserName = configuration.dmaapUserName();
-        this.dmaapUserPassword = configuration.dmaapUserPassword();
-    }
-
-    public CloseableHttpClient getHttpClient() {
-
-        logger.info("Preparing closeable http client");
-
-        HttpClientBuilder httpClientBuilder = HttpClients.custom().useSystemProperties();
-
-        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-
-        if (dmaapUserName != null) {
-            final AuthScope dmaapHostPortAuthScope = new AuthScope(dmaapHostName, dmaapPortNumber);
-            final Credentials dmaapCredentials = new UsernamePasswordCredentials(dmaapUserName, dmaapUserPassword);
-            credentialsProvider.setCredentials(dmaapHostPortAuthScope, dmaapCredentials);
-        }
-
-        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
-
-        return httpClientBuilder.build();
-    }
-}
  */
 package org.onap.dcaegen2.services.prh.service.consumer;
 
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
-
 import java.net.URI;
 import java.net.URISyntaxException;
 import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
  */
-public class DmaapConsumerReactiveHttpClient {
+public class DMaaPConsumerReactiveHttpClient {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -47,32 +43,17 @@ public class DmaapConsumerReactiveHttpClient {
     private final String dmaapTopicName;
     private final String consumerGroup;
     private final String consumerId;
-    private final String dmaapContentType;
-    private final String dmaapUserName;
-    private final String dmaapUserPassword;
 
-    public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
+    public DMaaPConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
         this.dmaapHostName = consumerConfiguration.dmaapHostName();
         this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
         this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
         this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
         this.consumerGroup = consumerConfiguration.consumerGroup();
         this.consumerId = consumerConfiguration.consumerId();
-        this.dmaapContentType = consumerConfiguration.dmaapContentType();
-        this.dmaapUserName = consumerConfiguration.dmaapUserName();
-        this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword();
-    }
-
-    public void initWebClient() {
-        this.webClient = WebClient.builder()
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
-            .filter(basicAuthentication(dmaapUserName, dmaapUserPassword))
-            .filter(logRequest())
-            .filter(logResponse())
-            .build();
     }
 
-    public Mono<String> getDmaaPConsumerResponse() {
+    public Mono<String> getDMaaPConsumerResponse() {
         try {
             return webClient
                 .get()
@@ -85,7 +66,7 @@ public class DmaapConsumerReactiveHttpClient {
                     Mono.error(new Exception("HTTP 500")))
                 .bodyToMono(String.class);
         } catch (URISyntaxException e) {
-            logger.warn("Exception while executing HTTP request: ", e);
+            logger.warn("Exception while evaluating URI ");
             return Mono.error(e);
         }
     }
@@ -94,28 +75,13 @@ public class DmaapConsumerReactiveHttpClient {
         return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
     }
 
-    void initWebClient(WebClient webClient) {
+    public DMaaPConsumerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
         this.webClient = webClient;
-    }
-
-    ExchangeFilterFunction logResponse() {
-        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
-            logger.info("Response Status {}", clientResponse.statusCode());
-            return Mono.just(clientResponse);
-        });
+        return this;
     }
 
     URI getUri() throws URISyntaxException {
         return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
             .setPath(createRequestPath()).build();
     }
-
-    ExchangeFilterFunction logRequest() {
-        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
-            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
-            clientRequest.headers()
-                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
-            return Mono.just(clientRequest);
-        });
-    }
 }
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
new file mode 100644 (file)
index 0000000..e29ecc6
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service.producer;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.http.client.utils.URIBuilder;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DMaaPProducerReactiveHttpClient {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private WebClient webClient;
+    private final String dmaapHostName;
+    private final Integer dmaapPortNumber;
+    private final String dmaapProtocol;
+    private final String dmaapTopicName;
+
+    public DMaaPProducerReactiveHttpClient(DmaapPublisherConfiguration dmaapPublisherConfiguration) {
+        this.dmaapHostName = dmaapPublisherConfiguration.dmaapHostName();
+        this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
+        this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
+        this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+    }
+
+    public Mono<String> getDMaaPProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
+        try {
+            return webClient
+                .post()
+                .uri(getUri())
+                .body(BodyInserters.fromObject(consumerDmaapModelMono))
+                .retrieve()
+                .onStatus(HttpStatus::is4xxClientError, clientResponse ->
+                    Mono.error(new Exception("HTTP 400"))
+                )
+                .onStatus(HttpStatus::is5xxServerError, clientResponse ->
+                    Mono.error(new Exception("HTTP 500")))
+                .bodyToMono(String.class);
+        } catch (URISyntaxException e) {
+            logger.warn("Exception while evaluating URI");
+            return Mono.error(e);
+        }
+    }
+
+    public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
+        this.webClient = webClient;
+        return this;
+    }
+
+    URI getUri() throws URISyntaxException {
+        return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+            .setPath(dmaapTopicName).build();
+    }
+
+}
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImpl.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImpl.java
deleted file mode 100644 (file)
index 7bdaab1..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service.producer;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.prh.model.CommonFunctions;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.prh.service.DmaapHttpClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Optional;
-
-public class ExtendedDmaapProducerHttpClientImpl {
-
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private final CloseableHttpClient closeableHttpClient;
-    private final String dmaapHostName;
-    private final String dmaapProtocol;
-    private final Integer dmaapPortNumber;
-    private final String dmaapTopicName;
-    private final String dmaapContentType;
-    private ConsumerDmaapModel consumerDmaapModel;
-
-
-    public ExtendedDmaapProducerHttpClientImpl(DmaapPublisherConfiguration configuration) {
-        this.closeableHttpClient = new DmaapHttpClientImpl(configuration).getHttpClient();
-        this.dmaapHostName = configuration.dmaapHostName();
-        this.dmaapProtocol = configuration.dmaapProtocol();
-        this.dmaapPortNumber = configuration.dmaapPortNumber();
-        this.dmaapTopicName = configuration.dmaapTopicName();
-        this.dmaapContentType = configuration.dmaapContentType();
-    }
-
-    public Optional<Integer> getHttpProducerResponse(ConsumerDmaapModel consumerDmaapModel) {
-        this.consumerDmaapModel = consumerDmaapModel;
-        try {
-            return createRequest()
-                .flatMap(this::executeHttpClient);
-        } catch (URISyntaxException e) {
-            logger.warn("Exception while executing HTTP request: ", e);
-        }
-        return Optional.empty();
-    }
-
-    private Optional<Integer> executeHttpClient(HttpRequestBase httpRequestBase) {
-        try {
-            return closeableHttpClient.execute(httpRequestBase, this::handleResponse);
-        } catch (IOException e) {
-            logger.warn("Exception while executing HTTP request: ", e);
-        }
-        return Optional.empty();
-    }
-
-    private Optional<HttpRequestBase> createRequest() throws URISyntaxException {
-        return "application/json".equals(dmaapContentType)
-            ? createDmaapPublisherExtendedURI().map(this::createHttpPostRequest)
-            : Optional.empty();
-    }
-
-    private Optional<URI> createDmaapPublisherExtendedURI() throws URISyntaxException {
-        return Optional.ofNullable(new URIBuilder()
-            .setScheme(dmaapProtocol)
-            .setHost(dmaapHostName)
-            .setPort(dmaapPortNumber)
-            .setPath(dmaapTopicName).build());
-    }
-
-    private HttpPost createHttpPostRequest(URI extendedURI) {
-        HttpPost post = new HttpPost(extendedURI);
-        post.addHeader("Content-type", dmaapContentType);
-        createStringEntity().ifPresent(post::setEntity);
-        return post;
-    }
-
-    private Optional<StringEntity> createStringEntity() {
-        try {
-            return Optional.of(new StringEntity(CommonFunctions.createJsonBody(consumerDmaapModel)));
-        } catch (UnsupportedEncodingException | IllegalArgumentException e) {
-            logger.warn("Exception while parsing JSON: ", e);
-        }
-        return Optional.empty();
-    }
-
-    Optional<Integer> handleResponse(HttpResponse response) throws IOException {
-
-        final Integer responseCode = response.getStatusLine().getStatusCode();
-        logger.info("Status code of operation: {}", responseCode);
-        final HttpEntity responseEntity = response.getEntity();
-
-        if (HttpUtils.isSuccessfulResponseCode(responseCode)) {
-            logger.trace("HTTP response successful.");
-            return Optional.of(responseCode);
-        } else {
-            String aaiResponse = responseEntity != null ? EntityUtils.toString(responseEntity) : "";
-            logger.warn("HTTP response not successful : {}", aaiResponse);
-            return Optional.of(responseCode);
-        }
-    }
-}
\ No newline at end of file
@@ -44,9 +44,9 @@ import reactor.test.StepVerifier;
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
  */
-public class DmaapConsumerReactiveHttpClientTest {
+public class DMaaPConsumerReactiveHttpClientTest {
 
-    private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+    private static DMaaPConsumerReactiveHttpClient DMaaPConsumerReactiveHttpClient;
 
     private static DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
     private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
@@ -68,13 +68,11 @@ public class DmaapConsumerReactiveHttpClientTest {
         when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
         when(consumerConfigurationMock.consumerId()).thenReturn("c12");
 
-        dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
+        DMaaPConsumerReactiveHttpClient = new DMaaPConsumerReactiveHttpClient(consumerConfigurationMock);
         webClient = spy(WebClient.builder()
             .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
             .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
                 consumerConfigurationMock.dmaapUserPassword()))
-            .filter(dmaapConsumerReactiveHttpClient.logRequest())
-            .filter(dmaapConsumerReactiveHttpClient.logResponse())
             .build());
         requestHeadersSpec = mock(RequestHeadersUriSpec.class);
         responseSpec = mock(ResponseSpec.class);
@@ -89,8 +87,8 @@ public class DmaapConsumerReactiveHttpClientTest {
         //when
         mockDependantObjects();
         doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
-        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
-        Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse();
+        DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
+        Mono<String> response = DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse();
 
         //then
         StepVerifier.create(response).expectSubscription()
@@ -108,11 +106,10 @@ public class DmaapConsumerReactiveHttpClientTest {
         mockDependantObjects();
         doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
             .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
-        dmaapConsumerReactiveHttpClient.initWebClient();
-        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+        DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
 
         //then
-        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+        StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
             .expectError(Exception.class);
 
     }
@@ -124,25 +121,24 @@ public class DmaapConsumerReactiveHttpClientTest {
         mockDependantObjects();
         doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
             .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
-        dmaapConsumerReactiveHttpClient.initWebClient();
-        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+        DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
 
         //then
-        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+        StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
             .expectError(Exception.class);
     }
 
     @Test
     public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
         //given
-        dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
+        DMaaPConsumerReactiveHttpClient = spy(DMaaPConsumerReactiveHttpClient);
         //when
         when(webClient.get()).thenReturn(requestHeadersSpec);
-        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
-        when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
+        DMaaPConsumerReactiveHttpClient.createDMaaPWebClient(webClient);
+        when(DMaaPConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
 
         //then
-        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+        StepVerifier.create(DMaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()).expectSubscription()
             .expectError(Exception.class);
     }
 
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java
new file mode 100644 (file)
index 0000000..c0b0c40
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
+ */
+public class DMaaPProducerReactiveHttpClientTest {
+
+    private static DMaaPProducerReactiveHttpClient dMaaPProducerReactiveHttpClient;
+
+    private static DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
+        DmaapPublisherConfiguration.class);
+    private static final Integer RESPONSE_SUCCESS = 200;
+    private static ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
+    private static Mono<Integer> expectedResult = Mono.empty();
+    private static WebClient webClient = mock(WebClient.class);
+    private static RequestBodyUriSpec requestBodyUriSpec;
+    private static ResponseSpec responseSpec;
+
+
+    @BeforeAll
+    public static void setUp() {
+        when(dmaapPublisherConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
+        when(dmaapPublisherConfigurationMock.dmaapProtocol()).thenReturn("https");
+        when(dmaapPublisherConfigurationMock.dmaapPortNumber()).thenReturn(1234);
+        when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH");
+        when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
+        when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
+        when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
+
+        dMaaPProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
+
+        webClient = spy(WebClient.builder()
+            .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
+            .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
+                dmaapPublisherConfigurationMock.dmaapUserPassword()))
+            .build());
+        requestBodyUriSpec = mock(RequestBodyUriSpec.class);
+        responseSpec = mock(ResponseSpec.class);
+    }
+
+    @Test
+    public void getHttpResponse_Success() {
+        //given
+        expectedResult = Mono.just(RESPONSE_SUCCESS);
+
+        //when
+        mockWebClientDependantObject();
+        doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+        dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
+        Mono<String> response = dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel));
+
+        //then
+        Assertions.assertEquals(response.block(), expectedResult.block());
+    }
+
+    @Test
+    public void getHttpResponse_HttpResponse4xxClientError() {
+        //when
+        mockWebClientDependantObject();
+
+        doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
+            .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
+        dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
+
+        //then
+        StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel)))
+            .expectSubscription()
+            .expectError(Exception.class);
+
+    }
+
+    @Test
+    public void getHttpResponse_HttpResponse5xxClientError() {
+
+        //when
+        mockWebClientDependantObject();
+        doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
+            .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
+        dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
+
+        //then
+        StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(Mono.just(consumerDmaapModel)))
+            .expectSubscription()
+            .expectError(Exception.class);
+    }
+
+    @Test
+    public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
+        //given
+        dMaaPProducerReactiveHttpClient = spy(dMaaPProducerReactiveHttpClient);
+        //when
+        when(webClient.post()).thenReturn(requestBodyUriSpec);
+        dMaaPProducerReactiveHttpClient.createDMaaPWebClient(webClient);
+        when(dMaaPProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
+
+        //then
+        StepVerifier.create(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any())).expectSubscription()
+            .expectError(Exception.class);
+    }
+
+    private void mockWebClientDependantObject() {
+        RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
+        when(webClient.post()).thenReturn(requestBodyUriSpec);
+        when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
+        when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
+        doReturn(responseSpec).when(requestHeadersSpec).retrieve();
+        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+    }
+}
\ No newline at end of file
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImplTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/ExtendedDmaapProducerHttpClientImplTest.java
deleted file mode 100644 (file)
index d9e7426..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service.producer;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Optional;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-class ExtendedDmaapProducerHttpClientImplTest {
-
-    private static ExtendedDmaapProducerHttpClientImpl objectUnderTest;
-    private static DmaapPublisherConfiguration configurationMock = mock(DmaapPublisherConfiguration.class);
-    private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
-    private static ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
-    private static Integer expectedResult;
-    private static final Integer RESPONSE_SUCCESS = 200;
-    private static final Integer RESPONSE_FAILURE = 404;
-    private final static HttpResponse httpResponseMock = mock(HttpResponse.class);
-    private final static HttpEntity httpEntityMock = mock(HttpEntity.class);
-    private final static StatusLine statusLineMock = mock(StatusLine.class);
-
-
-    @BeforeAll
-    static void init() throws NoSuchFieldException, IllegalAccessException {
-        when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-        when(configurationMock.dmaapProtocol()).thenReturn("https");
-        when(configurationMock.dmaapPortNumber()).thenReturn(1234);
-        when(configurationMock.dmaapUserName()).thenReturn("PRH");
-        when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
-        when(configurationMock.dmaapContentType()).thenReturn("application/json");
-        when(configurationMock.dmaapTopicName()).thenReturn("pnfReady");
-        objectUnderTest = new ExtendedDmaapProducerHttpClientImpl(configurationMock);
-        setField();
-    }
-
-
-    @Test
-    void getHttpResponsePost_success() throws IOException {
-        expectedResult = RESPONSE_SUCCESS;
-        when(closeableHttpClientMock.execute(any(HttpPost.class), any(ResponseHandler.class)))
-            .thenReturn(Optional.of(expectedResult));
-        Optional<Integer> actualResult = objectUnderTest.getHttpProducerResponse(consumerDmaapModel);
-        Assertions.assertEquals(expectedResult, actualResult.get());
-    }
-
-    @Test
-    void getExtendedDetails_returnsFailure() throws IOException {
-        expectedResult = RESPONSE_FAILURE;
-        when(closeableHttpClientMock.execute(any(HttpPost.class), any(ResponseHandler.class)))
-            .thenReturn(Optional.of(expectedResult));
-        Optional<Integer> actualResult = objectUnderTest.getHttpProducerResponse(consumerDmaapModel);
-        Assertions.assertEquals(expectedResult, actualResult.get());
-    }
-
-    @Test
-    void handleResponse_shouldReturn200() throws IOException {
-        // When
-        when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
-        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
-        when(httpResponseMock.getStatusLine().getStatusCode()).thenReturn(HttpStatus.SC_OK);
-        // Then
-        assertEquals(Optional.of(HttpStatus.SC_OK), objectUnderTest.handleResponse(httpResponseMock));
-    }
-
-    @Test
-    void handleResponse_shouldReturn300() throws IOException {
-        // When
-        when(httpResponseMock.getEntity()).thenReturn(httpEntityMock);
-        when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock);
-        when(httpResponseMock.getStatusLine().getStatusCode()).thenReturn(HttpStatus.SC_BAD_REQUEST);
-        // Then
-        assertEquals(Optional.of(HttpStatus.SC_BAD_REQUEST), objectUnderTest.handleResponse(httpResponseMock));
-    }
-
-    private static void setField() throws NoSuchFieldException, IllegalAccessException {
-        Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
-        field.setAccessible(true);
-        field.set(objectUnderTest, closeableHttpClientMock);
-    }
-}