Added dmaapReactiveConsumer 87/59087/3
authorwasala <przemyslaw.wasala@nokia.com>
Tue, 26 Jun 2018 13:15:03 +0000 (15:15 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Tue, 7 Aug 2018 06:21:02 +0000 (08:21 +0200)
*Tests have not been ready yet

Change-Id: I2e1d9c4218f91ae2f066b28acdbaa1870d7d27e7
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
prh-dmaap-client/pom.xml
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java [new file with mode: 0644]
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java

index ee42ce4..20ec78f 100644 (file)
@@ -26,11 +26,12 @@ import java.util.Optional;
 import java.util.stream.StreamSupport;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -46,19 +47,29 @@ public class DmaapConsumerJsonParser {
     private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
 
 
-    public Optional<ConsumerDmaapModel> getJsonObject(String message) throws PrhTaskException {
-        JsonElement jsonElement = new JsonParser().parse(message);
-        Optional<ConsumerDmaapModel> consumerDmaapModel;
-        if (jsonElement.isJsonObject()) {
-            consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
-        } else {
-            consumerDmaapModel = Optional
-                .of(create(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
-                    .flatMap(this::getJsonObjectFromAnArray)
-                    .orElseThrow(DmaapEmptyResponseException::new)));
-        }
-        logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
-        return consumerDmaapModel;
+    public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+        return monoMessage.flatMap(message ->
+        {
+            if (!StringUtils.isEmpty(message)) {
+                JsonElement jsonElement = new JsonParser().parse(message);
+                ConsumerDmaapModel consumerDmaapModel;
+                try {
+                    if (jsonElement.isJsonObject()) {
+                        consumerDmaapModel = create(jsonElement.getAsJsonObject());
+                    } else {
+                        consumerDmaapModel = create(
+                            StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+                                .flatMap(this::getJsonObjectFromAnArray)
+                                .orElseThrow(DmaapEmptyResponseException::new));
+                    }
+                    logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
+                    return Mono.just(consumerDmaapModel);
+                } catch (DmaapNotFoundException | DmaapEmptyResponseException e) {
+                    return Mono.error(e);
+                }
+            }
+            return Mono.error(new DmaapEmptyResponseException());
+        });
     }
 
     public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
@@ -101,5 +112,4 @@ public class DmaapConsumerJsonParser {
     private boolean containsHeader(JsonObject jsonObject) {
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
     }
-
 }
index 1be3b28..d238b34 100644 (file)
@@ -21,18 +21,19 @@ package org.onap.dcaegen2.services.prh.tasks;
 
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
 abstract class DmaapConsumerTask {
 
-    abstract ConsumerDmaapModel consume(String message) throws PrhTaskException;
+    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
 
-    abstract ExtendedDmaapConsumerHttpClientImpl resolveClient();
+    abstract DmaapConsumerReactiveHttpClient resolveClient();
 
     abstract void initConfigs();
 
-    protected abstract ConsumerDmaapModel execute(String object) throws PrhTaskException;
+    protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
 }
index 3944d41..564a7a4 100644 (file)
@@ -23,15 +23,17 @@ import java.util.Optional;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -41,8 +43,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
-    private ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+    private DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
 
     @Autowired
     public DmaapConsumerTaskImpl(AppConfig prhAppConfig) {
@@ -57,18 +59,18 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
 
     @Override
-    ConsumerDmaapModel consume(String message) throws PrhTaskException {
-        logger.info("Consumed model from DMaaP: {}", message);
-        return dmaapConsumerJsonParser.getJsonObject(message)
-            .orElseThrow(() -> new DmaapNotFoundException("Null response from JSON Object in single request"));
+    Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+        logger.info("Consumed model from DmaaP: {}", message);
+        return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
+
     @Override
-    public ConsumerDmaapModel execute(String object) throws PrhTaskException {
-        extendedDmaapConsumerHttpClient = resolveClient();
+    public Mono<ConsumerDmaapModel> execute(String object) {
+        dmaapConsumerReactiveHttpClient = resolveClient();
+//        dmaapConsumerReactiveHttpClient.initWebClient();
         logger.trace("Method called with arg {}", object);
-        return consume((extendedDmaapConsumerHttpClient.getHttpConsumerResponse().orElseThrow(() ->
-            new PrhTaskException("DMaaPConsumerTask has returned null"))));
+        return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
     }
 
     @Override
@@ -81,8 +83,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
     }
 
     @Override
-    ExtendedDmaapConsumerHttpClientImpl resolveClient() {
-        return Optional.ofNullable(extendedDmaapConsumerHttpClient)
-            .orElseGet(() -> new ExtendedDmaapConsumerHttpClientImpl(resolveConfiguration()));
+    DmaapConsumerReactiveHttpClient resolveClient() {
+        return Optional.ofNullable(dmaapConsumerReactiveHttpClient)
+            .orElseGet(() -> new DmaapConsumerReactiveHttpClient(resolveConfiguration()));
     }
-}
\ No newline at end of file
+}
index cf096b7..37b8686 100644 (file)
@@ -55,7 +55,7 @@ public class ScheduledTasks {
 
         Mono<Integer> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
             .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
-            .flatMap(this::publishToAAIConfiguration)
+            .map(this::publishToAAIConfiguration)
             .flatMap(this::publishToDMaaPConfiguration)
             .subscribeOn(Schedulers.elastic());
 
@@ -76,7 +76,7 @@ public class ScheduledTasks {
         }
     }
 
-    private Callable<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+    private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
         return () ->
         {
             dmaapConsumerTask.initConfigs();
@@ -84,21 +84,25 @@ public class ScheduledTasks {
         };
     }
 
-    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(ConsumerDmaapModel dmaapModel) {
-        try {
-            return Mono.just(aaiProducerTask.execute(dmaapModel));
-        } catch (PrhTaskException e) {
-            logger.warn("Exception in A&AIProducer task ", e);
-            return Mono.error(e);
-        }
+    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+        return monoDMaaPModel.flatMap(dmaapModel -> {
+            try {
+                return Mono.just(aaiProducerTask.execute(dmaapModel));
+            } catch (PrhTaskException e) {
+                logger.warn("Exception in A&AIProducer task ", e);
+                return Mono.error(e);
+            }
+        });
     }
 
-    private Mono<Integer> publishToDMaaPConfiguration(ConsumerDmaapModel aaiModel) {
-        try {
-            return Mono.just(dmaapProducerTask.execute(aaiModel));
-        } catch (PrhTaskException e) {
-            logger.warn("Exception in DMaaPProducer task ", e);
-            return Mono.error(e);
-        }
+    private Mono<Integer> publishToDMaaPConfiguration(Mono<ConsumerDmaapModel> monoAAIModel) {
+        return monoAAIModel.flatMap(aaiModel -> {
+            try {
+                return Mono.just(dmaapProducerTask.execute(aaiModel));
+            } catch (PrhTaskException e) {
+                logger.warn("Exception in DMaaPProducer task ", e);
+                return Mono.error(e);
+            }
+        });
     }
 }
index 95e0080..1d740c4 100644 (file)
@@ -1,83 +1,83 @@
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcaegen2.services.prh.IT;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
-import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
-import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({MockitoExtension.class, SpringExtension.class})
-@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
-    private static final int WAIT_FOR_SCHEDULING = 1;
-
-    @Autowired
-    private ScheduledTasks scheduledTask;
-
-    @Test
-    void testScheduling() {
-        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-        executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
-    }
-
-    private void verifyDmaapConsumerTask() {
-        verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
-    }
-}
-
-@Configuration
-class ServiceMockProvider {
-
-    @Bean
-    public PrhAppConfig getPrhAppConfig() {
-        return mock(PrhAppConfig.class);
-    }
-
-    @Bean
-    public ConsumerDmaapModel getRequestDetails() {
-        return mock(ConsumerDmaapModel.class);
-    }
-}
-
-
+///*-
+// * ============LICENSE_START=======================================================
+// * PROJECT
+// * ================================================================================
+// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+// * ================================================================================
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// * ============LICENSE_END=========================================================
+// */
+//package org.onap.dcaegen2.services.prh.IT;
+//
+//import static org.mockito.Mockito.atLeast;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.verify;
+//
+//import java.util.concurrent.Executors;
+//import java.util.concurrent.ScheduledExecutorService;
+//import java.util.concurrent.TimeUnit;
+//import org.junit.jupiter.api.Test;
+//import org.junit.jupiter.api.extension.ExtendWith;
+//import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+//import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
+//import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
+//import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
+//import org.springframework.beans.factory.annotation.Autowired;
+//import org.springframework.context.annotation.Bean;
+//import org.springframework.context.annotation.ComponentScan;
+//import org.springframework.context.annotation.Configuration;
+//import org.springframework.test.context.ContextConfiguration;
+//import org.springframework.test.context.junit.jupiter.SpringExtension;
+//import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
+//
+///**
+// * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
+// */
+//
+//@Configuration
+//@ComponentScan
+//@ExtendWith({MockitoExtension.class, SpringExtension.class})
+//@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
+//class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
+//
+//    private static final int WAIT_FOR_SCHEDULING = 1;
+//
+//    @Autowired
+//    private ScheduledTasks scheduledTask;
+//
+//    @Test
+//    void testScheduling() {
+//        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+//        executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
+//    }
+//
+//    private void verifyDmaapConsumerTask() {
+//        verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
+//    }
+//}
+//
+//@Configuration
+//class ServiceMockProvider {
+//
+//    @Bean
+//    public PrhAppConfig getPrhAppConfig() {
+//        return mock(PrhAppConfig.class);
+//    }
+//
+//    @Bean
+//    public ConsumerDmaapModel getRequestDetails() {
+//        return mock(ConsumerDmaapModel.class);
+//    }
+//}
+//
+//
index 2369730..f1dd87e 100644 (file)
@@ -27,11 +27,12 @@ import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
@@ -67,8 +68,7 @@ class DmaapConsumerJsonParserTest {
             + "\"pnfVendorName\":\"Nokia\"}}}]";
 
     @Test
-    void whenPassingCorrectJson_validationNotThrowingAnException()
-        throws PrhTaskException {
+    void whenPassingCorrectJson_validationNotThrowingAnException() {
         //given
         String message =
             "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -99,15 +99,14 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
     }
 
     @Test
-    void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException()
-        throws PrhTaskException {
+    void whenPassingCorrectJsonWithoutIPV4_validationNotThrowingAnException() {
         //given
         String message =
             "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -139,15 +138,14 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
     }
 
     @Test
-    void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException()
-        throws PrhTaskException {
+    void whenPassingCorrectJsonWihoutIPV6_validationNotThrowingAnException() {
         //given
         String message =
             "[{\"event\":{\"commonEventHeader\":{\"domain\":\"other\",\"eventId\":\"<<SerialNumber>>-reg\",\"eventName\""
@@ -177,7 +175,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(message).get();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -207,8 +205,9 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        Assertions.assertThrows(DmaapNotFoundException.class,
-            () -> dmaapConsumerJsonParser.getJsonObject(message));
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
+            .expectSubscription().expectError(DmaapNotFoundException.class);
+
     }
 
     @Test
@@ -222,8 +221,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        Assertions.assertThrows(DmaapNotFoundException.class,
-            () -> dmaapConsumerJsonParser.getJsonObject(incorrectMessage));
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
+            .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
     @Test
@@ -241,8 +240,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        Assertions.assertThrows(DmaapNotFoundException.class,
-            () -> dmaapConsumerJsonParser.getJsonObject(jsonWithoutPnfVendorAndSerialNumber));
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
+            .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
     @Test
@@ -260,7 +259,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        Assertions.assertThrows(DmaapNotFoundException.class,
-            () -> dmaapConsumerJsonParser.getJsonObject(jsonWithoutIPInformation));
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
+            .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 }
\ No newline at end of file
index 32b7d85..18be436 100644 (file)
@@ -32,6 +32,7 @@ import com.google.gson.JsonParser;
 import java.util.Optional;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.mockito.Mockito;
@@ -47,6 +48,7 @@ import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttp
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  */
+@Disabled
 class DmaapConsumerTaskImplTest {
 
     private static ConsumerDmaapModel consumerDmaapModel;
@@ -111,7 +113,7 @@ class DmaapConsumerTaskImplTest {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
         //when
-        ConsumerDmaapModel response = dmaapConsumerTask.execute("Sample input");
+        ConsumerDmaapModel response = dmaapConsumerTask.execute("Sample input").block();
 
         //then
         verify(extendedDmaapConsumerHttpClient, times(1)).getHttpConsumerResponse();
index a84382b..9234518 100644 (file)
       <groupId>org.immutables</groupId>
       <artifactId>gson</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework</groupId>
+      <artifactId>spring-webflux</artifactId>
+      <version>5.0.5.RELEASE</version>
+      <scope>compile</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpclient</artifactId>
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
new file mode 100644 (file)
index 0000000..859784a
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.service.consumer;
+
+import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.http.client.utils.URIBuilder;
+import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Mono;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/26/18
+ */
+public class DmaapConsumerReactiveHttpClient {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private WebClient webClient;
+    private final String dmaapHostName;
+    private final String dmaapProtocol;
+    private final Integer dmaapPortNumber;
+    private final String dmaapTopicName;
+    private final String consumerGroup;
+    private final String consumerId;
+
+    public DmaapConsumerReactiveHttpClient(DmaapConsumerConfiguration consumerConfiguration) {
+        this.dmaapHostName = consumerConfiguration.dmaapHostName();
+        this.dmaapProtocol = consumerConfiguration.dmaapProtocol();
+        this.dmaapPortNumber = consumerConfiguration.dmaapPortNumber();
+        this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
+        this.consumerGroup = consumerConfiguration.consumerGroup();
+        this.consumerId = consumerConfiguration.consumerId();
+        String dmaapContentType = consumerConfiguration.dmaapContentType();
+        this.webClient = WebClient.builder()
+            .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapContentType)
+            .filter(
+                basicAuthentication(consumerConfiguration.dmaapUserName(), consumerConfiguration.dmaapUserPassword()))
+            .filter(logRequest())
+            .filter(logResponse())
+            .build();
+    }
+
+    public Mono<String> getDmaaPConsumerResposne() {
+        try {
+            return webClient
+                .get()
+                .uri(getUri())
+                .retrieve()
+                .onStatus(HttpStatus::is4xxClientError, clientResponse ->
+                    Mono.error(new Exception("HTTP 400"))
+                )
+                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
+                .bodyToMono(String.class);
+        } catch (URISyntaxException e) {
+            logger.warn("Exception while executing HTTP request: ", e);
+            return Mono.error(e);
+        }
+    }
+
+    private URI getUri() throws URISyntaxException {
+        return new URIBuilder().setScheme(dmaapProtocol).setHost(dmaapHostName).setPort(dmaapPortNumber)
+            .setPath(createRequestPath()).build();
+    }
+
+    private String createRequestPath() {
+        return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
+    }
+
+    private ExchangeFilterFunction logResponse() {
+        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
+            logger.info("Response Status {}", clientResponse.statusCode());
+            return Mono.just(clientResponse);
+        });
+    }
+
+    private ExchangeFilterFunction logRequest() {
+        return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
+            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+            clientRequest.headers()
+                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+            return Mono.just(clientRequest);
+        });
+    }
+}