DmaapConsumerReactive fixed tests 89/59089/3
authorwasala <przemyslaw.wasala@nokia.com>
Tue, 26 Jun 2018 17:29:43 +0000 (19:29 +0200)
committerwasala <przemyslaw.wasala@nokia.com>
Tue, 7 Aug 2018 06:32:28 +0000 (08:32 +0200)
Change-Id: I888ef94a084f32a18c77c12a18fb6636a4f33649
Issue-ID: DCAEGEN2-557
Signed-off-by: wasala <przemyslaw.wasala@nokia.com>
12 files changed:
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java [new file with mode: 0644]
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/IT/ScheduledXmlContextITest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImplTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskSpy.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DmaapConsumerReactiveHttpClient.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java [deleted file]
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImplTest.java

index 20ec78f..22acf54 100644 (file)
@@ -30,7 +30,6 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
 
 /**
@@ -47,20 +46,21 @@ public class DmaapConsumerJsonParser {
     private static final String PNF_SERIAL_NUMBER = "pnfSerialNumber";
 
 
-    public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
+    public Mono<Optional<ConsumerDmaapModel>> getJsonObject(Mono<Optional<String>> monoMessage) {
         return monoMessage.flatMap(message ->
         {
-            if (!StringUtils.isEmpty(message)) {
-                JsonElement jsonElement = new JsonParser().parse(message);
-                ConsumerDmaapModel consumerDmaapModel;
+            if (message.isPresent()) {
+                JsonElement jsonElement = new JsonParser().parse(message.orElse(""));
+                Optional<ConsumerDmaapModel> consumerDmaapModel;
                 try {
                     if (jsonElement.isJsonObject()) {
-                        consumerDmaapModel = create(jsonElement.getAsJsonObject());
+                        consumerDmaapModel = Optional.of(create(jsonElement.getAsJsonObject()));
                     } else {
-                        consumerDmaapModel = create(
-                            StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
-                                .flatMap(this::getJsonObjectFromAnArray)
-                                .orElseThrow(DmaapEmptyResponseException::new));
+                        consumerDmaapModel = Optional
+                            .of(create(
+                                StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false).findFirst()
+                                    .flatMap(this::getJsonObjectFromAnArray)
+                                    .orElseThrow(DmaapEmptyResponseException::new)));
                     }
                     logger.info("Parsed model from DmaaP after getting it: {}", consumerDmaapModel);
                     return Mono.just(consumerDmaapModel);
@@ -112,4 +112,5 @@ public class DmaapConsumerJsonParser {
     private boolean containsHeader(JsonObject jsonObject) {
         return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(OTHER_FIELDS);
     }
+
 }
index d238b34..753d1f9 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
+import java.util.Optional;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
@@ -29,11 +30,11 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapConsumerTask {
 
-    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+    abstract Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException;
 
     abstract DmaapConsumerReactiveHttpClient resolveClient();
 
     abstract void initConfigs();
 
-    protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+    protected abstract Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException;
 }
index 564a7a4..3181c06 100644 (file)
@@ -24,7 +24,6 @@ import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
-import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
@@ -41,6 +40,7 @@ import reactor.core.publisher.Mono;
 @Component
 public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
+
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final Config prhAppConfig;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
@@ -59,18 +59,17 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
 
     @Override
-    Mono<ConsumerDmaapModel> consume(Mono<String> message) {
+    Mono<Optional<ConsumerDmaapModel>> consume(Mono<Optional<String>> message) throws PrhTaskException {
         logger.info("Consumed model from DmaaP: {}", message);
         return dmaapConsumerJsonParser.getJsonObject(message);
     }
 
-
     @Override
-    public Mono<ConsumerDmaapModel> execute(String object) {
+    public Mono<Optional<ConsumerDmaapModel>> execute(String object) throws PrhTaskException {
         dmaapConsumerReactiveHttpClient = resolveClient();
 //        dmaapConsumerReactiveHttpClient.initWebClient();
         logger.trace("Method called with arg {}", object);
-        return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResposne()));
+        return consume((dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()));
     }
 
     @Override
index 37b8686..6fa986e 100644 (file)
@@ -19,6 +19,7 @@
  */
 package org.onap.dcaegen2.services.prh.tasks;
 
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
@@ -76,7 +77,7 @@ public class ScheduledTasks {
         }
     }
 
-    private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
+    private Callable<Mono<Optional<ConsumerDmaapModel>>> consumeFromDMaaPMessage() {
         return () ->
         {
             dmaapConsumerTask.initConfigs();
@@ -84,10 +85,10 @@ public class ScheduledTasks {
         };
     }
 
-    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+    private Mono<ConsumerDmaapModel> publishToAAIConfiguration(Mono<Optional<ConsumerDmaapModel>> monoDMaaPModel) {
         return monoDMaaPModel.flatMap(dmaapModel -> {
             try {
-                return Mono.just(aaiProducerTask.execute(dmaapModel));
+                return Mono.just(aaiProducerTask.execute(dmaapModel.get()));
             } catch (PrhTaskException e) {
                 logger.warn("Exception in A&AIProducer task ", e);
                 return Mono.error(e);
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/Task.java
new file mode 100644 (file)
index 0000000..c26028a
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
+ */
+
+
+public abstract class Task<R, S, C> {
+
+    Task taskProcess;
+
+    protected abstract void receiveRequest(R body) throws PrhTaskException;
+
+    protected abstract S execute(R object) throws PrhTaskException;
+
+    protected abstract C resolveConfiguration();
+
+    void setNext(Task task) {
+        this.taskProcess = task;
+    }
+}
index 1d740c4..95e0080 100644 (file)
@@ -1,83 +1,83 @@
-///*-
-// * ============LICENSE_START=======================================================
-// * PROJECT
-// * ================================================================================
-// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
-// * ================================================================================
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *      http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// * ============LICENSE_END=========================================================
-// */
-//package org.onap.dcaegen2.services.prh.IT;
-//
-//import static org.mockito.Mockito.atLeast;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.verify;
-//
-//import java.util.concurrent.Executors;
-//import java.util.concurrent.ScheduledExecutorService;
-//import java.util.concurrent.TimeUnit;
-//import org.junit.jupiter.api.Test;
-//import org.junit.jupiter.api.extension.ExtendWith;
-//import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-//import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
-//import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
-//import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.ComponentScan;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.test.context.ContextConfiguration;
-//import org.springframework.test.context.junit.jupiter.SpringExtension;
-//import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-//
-///**
-// * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
-// */
-//
-//@Configuration
-//@ComponentScan
-//@ExtendWith({MockitoExtension.class, SpringExtension.class})
-//@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
-//class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-//
-//    private static final int WAIT_FOR_SCHEDULING = 1;
-//
-//    @Autowired
-//    private ScheduledTasks scheduledTask;
-//
-//    @Test
-//    void testScheduling() {
-//        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
-//        executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
-//    }
-//
-//    private void verifyDmaapConsumerTask() {
-//        verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
-//    }
-//}
-//
-//@Configuration
-//class ServiceMockProvider {
-//
-//    @Bean
-//    public PrhAppConfig getPrhAppConfig() {
-//        return mock(PrhAppConfig.class);
-//    }
-//
-//    @Bean
-//    public ConsumerDmaapModel getRequestDetails() {
-//        return mock(ConsumerDmaapModel.class);
-//    }
-//}
-//
-//
+/*-
+ * ============LICENSE_START=======================================================
+ * PROJECT
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.IT;
+
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.IT.junit5.mockito.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
+import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
+ */
+
+@Configuration
+@ComponentScan
+@ExtendWith({MockitoExtension.class, SpringExtension.class})
+@ContextConfiguration(locations = {"classpath:scheduled-context.xml"})
+class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
+
+    private static final int WAIT_FOR_SCHEDULING = 1;
+
+    @Autowired
+    private ScheduledTasks scheduledTask;
+
+    @Test
+    void testScheduling() {
+        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+        executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
+    }
+
+    private void verifyDmaapConsumerTask() {
+        verify(scheduledTask, atLeast(1)).scheduleMainPrhEventTask();
+    }
+}
+
+@Configuration
+class ServiceMockProvider {
+
+    @Bean
+    public PrhAppConfig getPrhAppConfig() {
+        return mock(PrhAppConfig.class);
+    }
+
+    @Bean
+    public ConsumerDmaapModel getRequestDetails() {
+        return mock(ConsumerDmaapModel.class);
+    }
+}
+
+
index f1dd87e..f24ef41 100644 (file)
@@ -28,7 +28,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import reactor.core.publisher.Mono;
@@ -99,7 +98,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
+            .getJsonObject(Mono.just(Optional.of(message))).block().get();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -138,7 +138,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
+            .block().get();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -175,7 +176,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(message)).block();
+        ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message)))
+            .block().get();
         //then
         Assertions.assertNotNull(consumerDmaapModel);
         Assertions.assertEquals(expectedObject, consumerDmaapModel);
@@ -205,7 +207,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(message)))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(message))))
             .expectSubscription().expectError(DmaapNotFoundException.class);
 
     }
@@ -221,7 +223,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(incorrectMessage)))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(incorrectMessage))))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
@@ -240,7 +242,8 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutPnfVendorAndSerialNumber)))
+        StepVerifier
+            .create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutPnfVendorAndSerialNumber))))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
 
@@ -259,7 +262,7 @@ class DmaapConsumerJsonParserTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(jsonWithoutIPInformation)))
+        StepVerifier.create(dmaapConsumerJsonParser.getJsonObject(Mono.just(Optional.of(jsonWithoutIPInformation))))
             .expectSubscription().expectError(DmaapNotFoundException.class);
     }
-}
\ No newline at end of file
+}
index 18be436..f9d7c7f 100644 (file)
@@ -34,26 +34,27 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.function.Executable;
 import org.mockito.Mockito;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.config.ImmutableDmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  */
-@Disabled
 class DmaapConsumerTaskImplTest {
 
     private static ConsumerDmaapModel consumerDmaapModel;
     private static DmaapConsumerTaskImpl dmaapConsumerTask;
-    private static ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient;
+    private static DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
     private static AppConfig appConfig;
     private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
     private static String message;
@@ -93,19 +94,16 @@ class DmaapConsumerTaskImplTest {
     }
 
     @Test
-    public void whenPassedObjectDoesntFit_DoesNotThrowPrhTaskException() {
+    public void whenPassedObjectDoesntFit_DoesNotThrowPrhTaskException() throws PrhTaskException {
         //given
         prepareMocksForDmaapConsumer(Optional.empty());
 
-        //when
-        Executable executableFunction = () -> dmaapConsumerTask.execute("Sample input");
-
         //then
-        Assertions
-            .assertThrows(PrhTaskException.class, executableFunction,
-                "Throwing exception when http response code won't fit to assignment range");
-        verify(extendedDmaapConsumerHttpClient, times(1)).getHttpConsumerResponse();
-        verifyNoMoreInteractions(extendedDmaapConsumerHttpClient);
+        StepVerifier.create(dmaapConsumerTask.execute("Sample input")).expectSubscription()
+            .expectError(DmaapEmptyResponseException.class);
+
+        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
+        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
     }
 
     @Test
@@ -113,13 +111,14 @@ class DmaapConsumerTaskImplTest {
         //given
         prepareMocksForDmaapConsumer(Optional.of(message));
         //when
-        ConsumerDmaapModel response = dmaapConsumerTask.execute("Sample input").block();
+
+        Mono<Optional<ConsumerDmaapModel>> response = dmaapConsumerTask.execute("Sample input");
 
         //then
-        verify(extendedDmaapConsumerHttpClient, times(1)).getHttpConsumerResponse();
-        verifyNoMoreInteractions(extendedDmaapConsumerHttpClient);
+        verify(dmaapConsumerReactiveHttpClient, times(1)).getDmaaPConsumerResponse();
+        verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
         Assertions.assertNotNull(response);
-        Assertions.assertEquals(consumerDmaapModel, response);
+        Assertions.assertEquals(consumerDmaapModel, response.block().get());
 
     }
 
@@ -128,11 +127,11 @@ class DmaapConsumerTaskImplTest {
         JsonElement jsonElement = new JsonParser().parse(parsed);
         Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
             .when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
-        extendedDmaapConsumerHttpClient = mock(ExtendedDmaapConsumerHttpClientImpl.class);
-        when(extendedDmaapConsumerHttpClient.getHttpConsumerResponse()).thenReturn(message);
+        dmaapConsumerReactiveHttpClient = mock(DmaapConsumerReactiveHttpClient.class);
+        when(dmaapConsumerReactiveHttpClient.getDmaaPConsumerResponse()).thenReturn(Mono.just(message));
         when(appConfig.getDmaapConsumerConfiguration()).thenReturn(dmaapConsumerConfiguration);
         dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig, dmaapConsumerJsonParser));
         when(dmaapConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
-        doReturn(extendedDmaapConsumerHttpClient).when(dmaapConsumerTask).resolveClient();
+        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
     }
 }
\ No newline at end of file
index 5736afe..225dd3f 100644 (file)
@@ -22,13 +22,10 @@ package org.onap.dcaegen2.services.prh.tasks;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 
-import org.onap.dcaegen2.services.prh.config.AAIClientConfiguration;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
-import org.onap.dcaegen2.services.prh.service.AAIProducerClient;
-import org.onap.dcaegen2.services.prh.service.consumer.ExtendedDmaapConsumerHttpClientImpl;
+import org.onap.dcaegen2.services.prh.service.consumer.DmaapConsumerReactiveHttpClient;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
@@ -45,10 +42,10 @@ public class DmaapConsumerTaskSpy {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(DmaapConsumerConfiguration.class)).when(appConfig).getDmaapConsumerConfiguration();
         DmaapConsumerTaskImpl dmaapConsumerTask = spy(new DmaapConsumerTaskImpl(appConfig));
-        ExtendedDmaapConsumerHttpClientImpl extendedDmaapConsumerHttpClient = mock(
-            ExtendedDmaapConsumerHttpClientImpl.class);
+        DmaapConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient = mock(
+            DmaapConsumerReactiveHttpClient.class);
         doReturn(mock(DmaapConsumerConfiguration.class)).when(dmaapConsumerTask).resolveConfiguration();
-        doReturn(extendedDmaapConsumerHttpClient).when(dmaapConsumerTask).resolveClient();
+        doReturn(dmaapConsumerReactiveHttpClient).when(dmaapConsumerTask).resolveClient();
         return dmaapConsumerTask;
     }
 }
index 859784a..a99833d 100644 (file)
@@ -24,6 +24,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Optional;
 import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class DmaapConsumerReactiveHttpClient {
             .build();
     }
 
-    public Mono<String> getDmaaPConsumerResposne() {
+    public Mono<Optional<String>> getDmaaPConsumerResponse() {
         try {
             return webClient
                 .get()
@@ -75,8 +76,10 @@ public class DmaapConsumerReactiveHttpClient {
                 .onStatus(HttpStatus::is4xxClientError, clientResponse ->
                     Mono.error(new Exception("HTTP 400"))
                 )
-                .onStatus(HttpStatus::is5xxServerError, clientResponse -> Mono.error(new Exception("HTTP 500")))
-                .bodyToMono(String.class);
+                .onStatus(HttpStatus::is5xxServerError, clientResponse ->
+                    Mono.error(new Exception("HTTP 500")))
+                .bodyToMono(String.class)
+                .map(Optional::of);
         } catch (URISyntaxException e) {
             logger.warn("Exception while executing HTTP request: ", e);
             return Mono.error(e);
diff --git a/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java b/prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/ExtendedDmaapConsumerHttpClientImpl.java
deleted file mode 100644 (file)
index 0f8fe28..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service.consumer;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Optional;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpRequestBase;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.service.DmaapHttpClientImpl;
-import org.onap.dcaegen2.services.prh.service.HttpUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ExtendedDmaapConsumerHttpClientImpl {
-
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    private final CloseableHttpClient closeableHttpClient;
-    private final String dmaapHostName;
-    private final String dmaapProtocol;
-    private final Integer dmaapPortNumber;
-    private final String dmaapTopicName;
-    private final String consumerGroup;
-    private final String consumerId;
-    private final String dmaapContentType;
-
-
-    public ExtendedDmaapConsumerHttpClientImpl(DmaapConsumerConfiguration configuration) {
-        this.closeableHttpClient = new DmaapHttpClientImpl(configuration).getHttpClient();
-        this.dmaapHostName = configuration.dmaapHostName();
-        this.dmaapProtocol = configuration.dmaapProtocol();
-        this.dmaapPortNumber = configuration.dmaapPortNumber();
-        this.dmaapTopicName = configuration.dmaapTopicName();
-        this.consumerGroup = configuration.consumerGroup();
-        this.consumerId = configuration.consumerId();
-        this.dmaapContentType = configuration.dmaapContentType();
-    }
-
-    public Optional<String> getHttpConsumerResponse() {
-
-        try {
-            return createRequest()
-                .flatMap(this::executeHttpClient);
-        } catch (URISyntaxException e) {
-            logger.warn("Exception while executing HTTP request: ", e);
-        }
-        return Optional.empty();
-    }
-
-    private Optional<String> executeHttpClient(HttpRequestBase httpRequestBase) {
-        try {
-            return closeableHttpClient.execute(httpRequestBase, getDmaapConsumerResponseHandler());
-        } catch (IOException e) {
-            logger.warn("Exception while executing HTTP request: ", e);
-        }
-        return Optional.empty();
-    }
-
-    private Optional<HttpRequestBase> createRequest() throws URISyntaxException {
-        return "application/json".equals(dmaapContentType)
-            ? createDmaapConsumerExtendedURI().map(this::createHttpRequest)
-            : Optional.empty();
-    }
-
-    private HttpRequestBase createHttpRequest(URI extendedURI) {
-        HttpRequestBase httpRequestBase = new HttpGet(extendedURI);
-        httpRequestBase.addHeader("Content-type", dmaapContentType);
-        return httpRequestBase;
-    }
-
-
-    private String createRequestPath() {
-        return dmaapTopicName + "/" + consumerGroup + "/" + consumerId;
-    }
-
-    private Optional<URI> createDmaapConsumerExtendedURI() throws URISyntaxException {
-        return Optional.ofNullable(new URIBuilder()
-            .setScheme(dmaapProtocol)
-            .setHost(dmaapHostName)
-            .setPort(dmaapPortNumber)
-            .setPath(createRequestPath()).build());
-    }
-
-    private ResponseHandler<Optional<String>> getDmaapConsumerResponseHandler() {
-        return httpResponse -> {
-            final int responseCode = httpResponse.getStatusLine().getStatusCode();
-            logger.info("Status code of operation: {}", responseCode);
-            final HttpEntity responseEntity = httpResponse.getEntity();
-
-            if (HttpUtils.isSuccessfulResponseCode(responseCode)) {
-                logger.trace("HTTP response successful.");
-                final String dmaapResponse = EntityUtils.toString(responseEntity);
-                return Optional.of(dmaapResponse);
-            } else {
-                String dmaapResponse = responseEntity != null ? EntityUtils.toString(responseEntity) : "";
-                logger.trace("HTTP response not successful : {}", dmaapResponse);
-                return Optional.of(String.valueOf(responseCode));
-            }
-        };
-    }
-}
-
-
index 39c10de..d2c0e77 100644 (file)
@@ -1,96 +1,96 @@
-/*-
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.service.consumer;
-
-import org.apache.http.client.ResponseHandler;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Optional;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class ExtendedDmaapConsumerHttpClientImplTest {
-
-    private static ExtendedDmaapConsumerHttpClientImpl objectUnderTest;
-
-    private static DmaapConsumerConfiguration configurationMock = mock(DmaapConsumerConfiguration.class);
-    private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
-
-    private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\" }";
-
-    private static Optional<String> expectedResult = Optional.empty();
-
-    @BeforeAll
-    public static void init() throws NoSuchFieldException, IllegalAccessException {
-
-        when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
-        when(configurationMock.dmaapProtocol()).thenReturn("https");
-        when(configurationMock.dmaapPortNumber()).thenReturn(1234);
-        when(configurationMock.dmaapUserName()).thenReturn("PRH");
-        when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
-        when(configurationMock.dmaapContentType()).thenReturn("application/json");
-        when(configurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
-        when(configurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
-        when(configurationMock.consumerId()).thenReturn("c12");
-
-        objectUnderTest = new ExtendedDmaapConsumerHttpClientImpl(configurationMock);
-
-        setField();
-    }
-
-
-    @Test
-    public void getHttpResponseGet_success() throws IOException {
-        expectedResult = Optional.of(JSON_MESSAGE);
-
-        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class)))
-                .thenReturn(expectedResult);
-
-        Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
-
-        Assertions.assertEquals(expectedResult.get(), actualResult.get());
-    }
-
-    @Test
-    public void getExtendedDetails_returnsNull() throws IOException {
-        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class))).
-                thenReturn(Optional.empty());
-        Optional<String>  actualResult = objectUnderTest.getHttpConsumerResponse();
-        Assertions.assertEquals(Optional.empty(),actualResult);
-    }
-
-
-    private static void setField() throws NoSuchFieldException, IllegalAccessException {
-        Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
-        field.setAccessible(true);
-        field.set(objectUnderTest, closeableHttpClientMock);
-    }
-}
+///*-
+// * ============LICENSE_START=======================================================
+// * PNF-REGISTRATION-HANDLER
+// * ================================================================================
+// * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+// * ================================================================================
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// * ============LICENSE_END=========================================================
+// */
+//
+//package org.onap.dcaegen2.services.prh.service.consumer;
+//
+//import org.apache.http.client.ResponseHandler;
+//import org.apache.http.client.methods.HttpGet;
+//import org.apache.http.impl.client.CloseableHttpClient;
+//import org.junit.jupiter.api.Assertions;
+//import org.junit.jupiter.api.BeforeAll;
+//import org.junit.jupiter.api.Test;
+//import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
+//
+//import java.io.IOException;
+//import java.lang.reflect.Field;
+//import java.util.Optional;
+//
+//import static org.mockito.ArgumentMatchers.any;
+//import static org.mockito.Mockito.mock;
+//import static org.mockito.Mockito.when;
+//
+//
+//public class ExtendedDmaapConsumerHttpClientImplTest {
+//
+//    private static ExtendedDmaapConsumerHttpClientImpl objectUnderTest;
+//
+//    private static DmaapConsumerConfiguration configurationMock = mock(DmaapConsumerConfiguration.class);
+//    private static CloseableHttpClient closeableHttpClientMock = mock(CloseableHttpClient.class);
+//
+//    private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\" }";
+//
+//    private static Optional<String> expectedResult = Optional.empty();
+//
+//    @BeforeAll
+//    public static void init() throws NoSuchFieldException, IllegalAccessException {
+//
+//        when(configurationMock.dmaapHostName()).thenReturn("54.45.33.2");
+//        when(configurationMock.dmaapProtocol()).thenReturn("https");
+//        when(configurationMock.dmaapPortNumber()).thenReturn(1234);
+//        when(configurationMock.dmaapUserName()).thenReturn("PRH");
+//        when(configurationMock.dmaapUserPassword()).thenReturn("PRH");
+//        when(configurationMock.dmaapContentType()).thenReturn("application/json");
+//        when(configurationMock.dmaapTopicName()).thenReturn("unauthenticated.SEC_OTHER_OUTPUT");
+//        when(configurationMock.consumerGroup()).thenReturn("OpenDCAE-c12");
+//        when(configurationMock.consumerId()).thenReturn("c12");
+//
+//        objectUnderTest = new ExtendedDmaapConsumerHttpClientImpl(configurationMock);
+//
+//        setField();
+//    }
+//
+//
+//    @Test
+//    public void getHttpResponseGet_success() throws IOException {
+//        expectedResult = Optional.of(JSON_MESSAGE);
+//
+//        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class)))
+//                .thenReturn(expectedResult);
+//
+//        Optional<String> actualResult = objectUnderTest.getHttpConsumerResponse();
+//
+//        Assertions.assertEquals(expectedResult.get(), actualResult.get());
+//    }
+//
+//    @Test
+//    public void getExtendedDetails_returnsNull() throws IOException {
+//        when(closeableHttpClientMock.execute(any(HttpGet.class), any(ResponseHandler.class))).
+//                thenReturn(Optional.empty());
+//        Optional<String>  actualResult = objectUnderTest.getHttpConsumerResponse();
+//        Assertions.assertEquals(Optional.empty(),actualResult);
+//    }
+//
+//
+//    private static void setField() throws NoSuchFieldException, IllegalAccessException {
+//        Field field = objectUnderTest.getClass().getDeclaredField("closeableHttpClient");
+//        field.setAccessible(true);
+//        field.set(objectUnderTest, closeableHttpClientMock);
+//    }
+//}