Refactor Optional in MonoResponse 91/59091/7
authorwasala <przemyslaw.wasala@nokia.com>
Wed, 27 Jun 2018 12:29:06 +0000 (14:29 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Tue, 7 Aug 2018 07:18:01 +0000 (09:18 +0200)
Added JUnit tests for DmaapConsumer
Reactive Client.

Added property for spring to run
PRH as none-web application

Change-Id: I8b762389927151387da5b79e8b75b670600ee5e8
Issue-ID: DCAEGEN2-563
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
13 files changed:
prh-app-server/config/application.yaml
prh-app-server/pom.xml
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/main/resources/application.properties
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
prh-dmaap-client/pom.xml
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java [new file with mode: 0644]
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java [deleted file]

index 306c94f..4c8c3ef 100644 (file)
@@ -1,6 +1,8 @@
 spring:
   profiles:
     active: prod
+  main:
+    web-application-type: none
 server:
   port: 8433
   ssl:
index dbe3d1b..ed24fde 100644 (file)
       <groupId>io.springfox</groupId>
       <artifactId>springfox-swagger-ui</artifactId>
     </dependency>
-
-    <dependency>
-      <groupId>org.onap.dcaegen2.services.prh</groupId>
-      <artifactId>prh-aai-client</artifactId>
-      <version>1.0.0-SNAPSHOT</version>
-    </dependency>
-
-
   </dependencies>
   <dependencyManagement>
     <dependencies>
index 22acf54..1d215c6 100644 (file)
@@ -30,6 +30,7 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
 
 /**
@@ -46,21 +47,20 @@ public class DmaapConsumerJsonParser {
     private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
 
 
-    public Mono<Optional<ConsumerDmaapModel>> getJsonObject(Mono<Optional<String>> monoMessage) {
+    public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
         return monoMessage.flatMap(message ->
         {
-            if (message.isPresent()) {
-                JsonElement jsonElement = new JsonParser().parse(message.orElse(""));
-                Optional<ConsumerDmaapModel> consumerDmaapModel;
+            if (!StringUtils.isEmpty(message)) {
+                JsonElement jsonElement = new JsonParser().parse(message);
+                ConsumerDmaapModel consumerDmaapModel;
                 try {
                     if (jsonElement.isJsonObject()) {
-                        consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
+                        consumerDmaapModel = create(jsonElement.getAsJsonObject());
                     } else {
-                        consumerDmaapModel = Optional
-                            .of(create(
-                                StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
-                                    .flatMap(this::getJsonObjectFromAnArray)
-                                    .orElseThrow(DmaapEmptyResponseException::new)));
+                        consumerDmaapModel = create(
+                            StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+                                .flatMap(this::getJsonObjectFromAnArray)
+                                .orElseThrow(DmaapEmptyResponseException::new));
                     }
                     logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
                     return Mono.just(consumerDmaapModel);
index 753d1f9..5cd30f8 100644 (file)
@@ -30,11 +30,11 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapConsumerTask {
 
-    abstract Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException;
+    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
 
     abstract DmaapConsumerReactiveHttpClient resolveClient();
 
     abstract void initConfigs();
 
-    protected abstract Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException;
+    protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
 }
index 3181c06..08008f0 100644 (file)
@@ -40,7 +40,6 @@ import reactor.core.publisher.Mono;
 @Component
 public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
-
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -57,17 +56,16 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
         this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
     }
 
-
     @Override
-    Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
+    Mono<ConsumerDmaapModel> consume(Mono<String> message) {
         logger.info("Consumed model from DmaaP: {}", message);
         return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
     @Override
-    public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
+    public Mono<ConsumerDmaapModel> execute(String object) {
         dmaapConsumerReactiveHttpClient = resolveClient();
-//        dmaapConsumerReactiveHttpClient.initWebClient();
+        dmaapConsumerReactiveHttpClient.initWebClient();
         logger.trace("Method called with arg {}", object);
         return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
     }
index 6fa986e..e161e3c 100644 (file)
@@ -77,7 +77,7 @@ public class ScheduledTasks {
         }
     }
 
-    private Callable<Mono<Optional<ConsumerDmaapModel>>> consumeFromDMaaPMessage() {
+    private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
         return () ->
         {
             dmaapConsumerTask.initConfigs();
@@ -85,10 +85,10 @@ public class ScheduledTasks {
         };
     }
 
-    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<Optional<ConsumerDmaapModel>> monoDMaaPModel) {
+    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
         return monoDMaaPModel.flatMap(dmaapModel -> {
             try {
-                return Mono.just(aaiProducerTask.execute(dmaapModel.get()));
+                return Mono.just(aaiProducerTask.execute(dmaapModel));
             } catch (PrhTaskException e) {
                 logger.warn("Exception in A&AIProducer task ", e);
                 return Mono.error(e);
index 53fa9cd..2593b3d 100644 (file)
@@ -1,4 +1,5 @@
 spring.profiles.active=prod
+spring.main.web-application-type=none
 server.port=8433
 server.ssl.key-store-type=PKCS12
 server.ssl.key-store-password=nokiapnf
index f24ef41..9e7edc4 100644 (file)
@@ -38,34 +38,6 @@ import reactor.test.StepVerifier;
  */
 class DmaapConsumerJsonParserTest {
 
-    private String incorrectMessage =
-        "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\""
-            + ",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},"
-            + "\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\""
-            + ":\"Normal\",\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\","
-            + "\"sourceName\":\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3}}}]";
-
-    private String jsonWithoutPnfVendorAndSerialNumber =
-        "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":"
-            + "\"<<SerialNumber>>-reg\",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\""
-            + "internalHeaderFields\":{},\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\""
-            + ":\"5DU\",\"priority\":\"Normal\",reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\""
-            + "<<SerialNumber>>\",\"sourceName\":\"5GRAN_DU\",startEpochMicrosec\":1519837825682,\"version\":3},"
-            + "\"otherFields\":{\"otherFieldsVersion\":1,\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,"
-            + "\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":\"AJ02\",\"pnfOamIpv4Address\":"
-            + "\"10.16.123.234\",\"pnfOamIpv6Address\":\"0:0:0:0:0:FFFF:0A10:7BEA\",\"pnfSoftwareVersion\":\"v4.5.0.1\","
-            + "\"pnfType\":\"AirScale\"}}}]";
-
-    private String jsonWithoutIPInformation =
-        "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\":"
-            + "\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},\"lastEpochMicrosec\""
-            + ":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\":\"Normal\","
-            + "\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\",\"sourceName\":"
-            + "\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3},\"otherFields\":{\"otherFieldsVersion\":1,"
-            + "\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":"
-            + "\"AJ02\",\"pnfSerialNumber\":\"QTFCOC540002E\",\"pnfSoftwareVersion\":\"v4.5.0.1\",\"pnfType\":\"AirScale\","
-            + "\"pnfVendorName\":\"Nokia\"}}}]";
-
     @Test
     void whenPassingCorrectJson_validationNotThrowingAnException() {
         //given
@@ -99,7 +71,7 @@ class DmaapConsumerJsonParserTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
-            .getJsonObject(Mono.just(Optional.of(message))).block().get();
+            .getJsonObject(Mono.just((message))).block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -138,8 +110,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
-            .block().get();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
+            .block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -176,8 +148,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
-            .block().get();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just((message)))
+            .block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -207,7 +179,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message))))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
             .expectSubscription().expectError(DmaapNotFoundException.class);
 
     }
@@ -223,7 +195,13 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(incorrectMessage))))
+        String incorrectMessage =
+            "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\""
+                + ",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},"
+                + "\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\""
+                + ":\"Normal\",\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\","
+                + "\"sourceName\":\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3}}}]";
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
@@ -242,8 +220,18 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
+        String jsonWithoutPnfVendorAndSerialNumber =
+            "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":"
+                + "\"<<SerialNumber>>-reg\",\"eventName\":\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\""
+                + "internalHeaderFields\":{},\"lastEpochMicrosec\":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\""
+                + ":\"5DU\",\"priority\":\"Normal\",reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\""
+                + "<<SerialNumber>>\",\"sourceName\":\"5GRAN_DU\",startEpochMicrosec\":1519837825682,\"version\":3},"
+                + "\"otherFields\":{\"otherFieldsVersion\":1,\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,"
+                + "\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":\"AJ02\",\"pnfOamIpv4Address\":"
+                + "\"10.16.123.234\",\"pnfOamIpv6Address\":\"0:0:0:0:0:FFFF:0A10:7BEA\",\"pnfSoftwareVersion\":\"v4.5.0.1\","
+                + "\"pnfType\":\"AirScale\"}}}]";
         StepVerifier
-            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutPnfVendorAndSerialNumber))))
+            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
@@ -262,7 +250,16 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutIPInformation))))
+        String jsonWithoutIPInformation =
+            "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\":"
+                + "\"pnfRegistration_5GDU\",\"eventType\":\"pnfRegistration\",\"internalHeaderFields\":{},\"lastEpochMicrosec\""
+                + ":1519837825682,\"nfNamingCode\":\"5GRAN\",\"nfcNamingCode\":\"5DU\",\"priority\":\"Normal\","
+                + "\"reportingEntityName\":\"5GRAN_DU\",\"sequence\":0,\"sourceId\":\"<<SerialNumber>>\",\"sourceName\":"
+                + "\"5GRAN_DU\",\"startEpochMicrosec\":1519837825682,\"version\":3},\"otherFields\":{\"otherFieldsVersion\":1,"
+                + "\"pnfFamily\":\"BBU\",\"pnfLastServiceDate\":1517206400,\"pnfManufactureDate\":1516406400,\"pnfModelNumber\":"
+                + "\"AJ02\",\"pnfSerialNumber\":\"QTFCOC540002E\",\"pnfSoftwareVersion\":\"v4.5.0.1\",\"pnfType\":\"AirScale\","
+                + "\"pnfVendorName\":\"Nokia\"}}}]";
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 }
index f9d7c7f..71e132c 100644 (file)
@@ -103,22 +103,20 @@ class DmaapConsumerTaskImplTest {
             .expectError(DmaapEmptyResponseException.class);
 
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
-        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
     }
 
     @Test
     public void whenPassedObjectFits_ReturnsCorrectResponse() throws PrhTaskException {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
-        //when
 
-        Mono<Optional<ConsumerDmaapModel>> response = dmaapConsumerTask.execute("Sample input");
+        //when
+        Mono<ConsumerDmaapModel> response = dmaapConsumerTask.execute("Sample input");
 
         //then
         verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
-        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
         Assertions.assertNotNull(response);
-        Assertions.assertEquals(consumerDmaapModel, response.block().get());
+        Assertions.assertEquals(consumerDmaapModel, response.block());
 
     }
 
@@ -128,7 +126,7 @@ class DmaapConsumerTaskImplTest {
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
         dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
-        when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message));
+        when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message.orElse("")));
         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
         dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerJsonParser));
         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
index 9234518..0633b46 100644 (file)
       <version>5.0.5.RELEASE</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-reactor-netty</artifactId>
+      <version>2.0.4.RELEASE</version>
+    </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.projectreactor</groupId>
+      <artifactId>reactor-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
index a99833d..cb7d5af 100644 (file)
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.dcaegen2.services.prh.service.consumer;
 
 import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Optional;
 import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.slf4j.Logger;
@@ -49,6 +47,9 @@ public class DmaapConsumerReactiveHttpClient {
     private final String dmaapTopicName;
     private final String consumerGroup;
     private final String consumerId;
+    private final String dmaapContentType;
+    private final String dmaapUserName;
+    private final String dmaapUserPassword;
 
     public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
         this.dmaapHostName = consumerConfiguration.dmaapHostName();
@@ -57,17 +58,21 @@ public class DmaapConsumerReactiveHttpClient {
         this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
         this.consumerGroup = consumerConfiguration.consumerGroup();
         this.consumerId = consumerConfiguration.consumerId();
-        String dmaapContentType = consumerConfiguration.dmaapContentType();
+        this.dmaapContentType = consumerConfiguration.dmaapContentType();
+        this.dmaapUserName = consumerConfiguration.dmaapUserName();
+        this.dmaapUserPassword = consumerConfiguration.dmaapUserPassword();
+    }
+
+    public void initWebClient() {
         this.webClient = WebClient.builder()
             .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
-            .filter(
-                basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+            .filter(basicAuthentication(dmaapUserName, dmaapUserPassword))
             .filter(logRequest())
             .filter(logResponse())
             .build();
     }
 
-    public Mono<Optional<String>> getDmaaPConsumerResponse() {
+    public Mono<String> getDmaaPConsumerResponse() {
         try {
             return webClient
                 .get()
@@ -78,31 +83,34 @@ public class DmaapConsumerReactiveHttpClient {
                 )
                 .onStatus(HttpStatus::is5xxServerError, clientResponse ->
                     Mono.error(new Exception("HTTP 500")))
-                .bodyToMono(String.class)
-                .map(Optional::of);
+                .bodyToMono(String.class);
         } catch (URISyntaxException e) {
             logger.warn("Exception while executing HTTP request: ", e);
             return Mono.error(e);
         }
     }
 
-    private URI getUri() throws URISyntaxException {
-        return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
-            .setPath(createRequestPath()).build();
-    }
-
     private String createRequestPath() {
         return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
     }
 
-    private ExchangeFilterFunction logResponse() {
+    void initWebClient(WebClient webClient) {
+        this.webClient = webClient;
+    }
+
+    ExchangeFilterFunction logResponse() {
         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
             logger.info("Response Status {}", clientResponse.statusCode());
             return Mono.just(clientResponse);
         });
     }
 
-    private ExchangeFilterFunction logRequest() {
+    URI getUri() throws URISyntaxException {
+        return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+            .setPath(createRequestPath()).build();
+    }
+
+    ExchangeFilterFunction logRequest() {
         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
             logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
             clientRequest.headers()
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClientTest.java
new file mode 100644 (file)
index 0000000..6396660
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/27/18
+ */
+public class DmaapConsumerReactiveHttpClientTest {
+
+    private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+
+    private static DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class);
+    private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}";
+    private static Mono<String> expectedResult = Mono.empty();
+    private static WebClient webClient = mock(WebClient.class);
+    private static RequestHeadersUriSpec requestHeadersSpec;
+    private static ResponseSpec responseSpec;
+
+
+    @BeforeAll
+    public static void setUp() {
+        when(consumerConfigurationMock.dmaapHostName()).thenReturn("54.45.33.2");
+        when(consumerConfigurationMock.dmaapProtocol()).thenReturn("https");
+        when(consumerConfigurationMock.dmaapPortNumber()).thenReturn(1234);
+        when(consumerConfigurationMock.dmaapUserName()).thenReturn("PRH");
+        when(consumerConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
+        when(consumerConfigurationMock.dmaapContentType()).thenReturn("application/json");
+        when(consumerConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
+        when(consumerConfigurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
+        when(consumerConfigurationMock.consumerId()).thenReturn("c12");
+
+        dmaapConsumerReactiveHttpClient = new DmaapConsumerReactiveHttpClient(consumerConfigurationMock);
+        webClient = spy(WebClient.builder()
+            .defaultHeader(HttpHeaders.CONTENT_TYPE, consumerConfigurationMock.dmaapContentType())
+            .filter(basicAuthentication(consumerConfigurationMock.dmaapUserName(),
+                consumerConfigurationMock.dmaapUserPassword()))
+            .filter(dmaapConsumerReactiveHttpClient.logRequest())
+            .filter(dmaapConsumerReactiveHttpClient.logResponse())
+            .build());
+        requestHeadersSpec = mock(RequestHeadersUriSpec.class);
+        responseSpec = mock(ResponseSpec.class);
+    }
+
+
+    @Test
+    public void getHttpResponse_Success() {
+        //given
+        expectedResult = Mono.just(JSON_MESSAGE);
+
+        //when
+        mockDependantObjects();
+        doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
+        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+        Mono<String> response = dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse();
+
+        //then
+        StepVerifier.create(response).expectSubscription()
+            .expectNextMatches(results -> {
+                Assertions.assertEquals(results, expectedResult.block());
+                return true;
+            }).verifyComplete();
+    }
+
+
+    @Test
+    public void getHttpResponse_HttpResponse4xxClientError() {
+
+        //when
+        mockDependantObjects();
+        doAnswer(invocationOnMock -> Mono.error(new Exception("400")))
+            .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("400")));
+        dmaapConsumerReactiveHttpClient.initWebClient();
+        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+        //then
+        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+            .expectError(Exception.class);
+
+    }
+
+    @Test
+    public void getHttpResponse_HttpResponse5xxClientError() {
+
+        //when
+        mockDependantObjects();
+        doAnswer(invocationOnMock -> Mono.error(new Exception("500")))
+            .when(responseSpec).onStatus(HttpStatus::is4xxClientError, e -> Mono.error(new Exception("500")));
+        dmaapConsumerReactiveHttpClient.initWebClient();
+        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+
+        //then
+        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+            .expectError(Exception.class);
+    }
+
+    @Test
+    public void getHttpResponse_whenURISyntaxExceptionHasBeenThrown() throws URISyntaxException {
+        //given
+        dmaapConsumerReactiveHttpClient = spy(dmaapConsumerReactiveHttpClient);
+        //when
+        when(webClient.get()).thenReturn(requestHeadersSpec);
+        dmaapConsumerReactiveHttpClient.initWebClient(webClient);
+        when(dmaapConsumerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
+
+        //then
+        StepVerifier.create(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).expectSubscription()
+            .expectError(Exception.class);
+    }
+
+    private void mockDependantObjects() {
+        when(webClient.get()).thenReturn(requestHeadersSpec);
+        when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
+        when(requestHeadersSpec.retrieve()).thenReturn(responseSpec);
+        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+    }
+
+}
\ No newline at end of file
diff --git a/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java b/prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java
deleted file mode 100644 (file)
index d2c0e77..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-///*-
-// * ============LICENSE_START=======================================================
-// * PNF-REGISTRATION-HANDLER
-// * ================================================================================
-// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
-// * ================================================================================
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *      http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// * ============LICENSE_END=========================================================
-// */
-//
-//package org.onap.dcaegen2.services.prh.service.consumer;
-//
-//import org.apache.http.client.ResponseHandler;
-//import org.apache.http.client.methods.HttpGet;
-//import org.apache.http.impl.client.CloseableHttpClient;
-//import org.junit.jupiter.api.Assertions;
-//import org.junit.jupiter.api.BeforeAll;
-//import org.junit.jupiter.api.Test;
-//import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-//
-//import java.io.IOException;
-//import java.lang.reflect.Field;
-//import java.util.Optional;
-//
-//import static org.mockito.ArgumentMatchers.any;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//
-//public class ExtendedDmaapConsumerHttpClientImplTest {
-//
-//    private static ExtendedDmaapConsumerHttpClientImpl objectUnderTest;
-//
-//    private static DmaapConsumerConfiguration configurationMock = mock(DmaapConsumerConfiguration.class);
-//    private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
-//
-//    private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\" }";
-//
-//    private static Optional<String> expectedResult = Optional.empty();
-//
-//    @BeforeAll
-//    public static void init() throws NoSuchFieldException, IllegalAccessException {
-//
-//        when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-//        when(configurationMock.dmaapProtocol()).thenReturn("https");
-//        when(configurationMock.dmaapPortNumber()).thenReturn(1234);
-//        when(configurationMock.dmaapUserName()).thenReturn("PRH");
-//        when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
-//        when(configurationMock.dmaapContentType()).thenReturn("application/json");
-//        when(configurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
-//        when(configurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
-//        when(configurationMock.consumerId()).thenReturn("c12");
-//
-//        objectUnderTest = new ExtendedDmaapConsumerHttpClientImpl(configurationMock);
-//
-//        setField();
-//    }
-//
-//
-//    @Test
-//    public void getHttpResponseGet_success() throws IOException {
-//        expectedResult = Optional.of(JSON_MESSAGE);
-//
-//        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class)))
-//                .thenReturn(expectedResult);
-//
-//        Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-//
-//        Assertions.assertEquals(expectedResult.get(), actualResult.get());
-//    }
-//
-//    @Test
-//    public void getExtendedDetails_returnsNull() throws IOException {
-//        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class))).
-//                thenReturn(Optional.empty());
-//        Optional<String>  actualResult = objectUnderTest.getHttpConsumerResponse();
-//        Assertions.assertEquals(Optional.empty(),actualResult);
-//    }
-//
-//
-//    private static void setField() throws NoSuchFieldException, IllegalAccessException {
-//        Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
-//        field.setAccessible(true);
-//        field.set(objectUnderTest, closeableHttpClientMock);
-//    }
-//}