Refatoring due to prh workflow 39/64339/1
authorpwielebs <piotr.wielebski@nokia.com>
Tue, 4 Sep 2018 07:29:49 +0000 (09:29 +0200)
committerpwielebs <piotr.wielebski@nokia.com>
Tue, 4 Sep 2018 07:29:49 +0000 (09:29 +0200)
1. Added specified HttpClient for DmaaPPublisher:
*DmaaP Handle transfer-encoding: chunk header and
reject the request if it will be set by the client.
In conclusion no other reactive http client can be
used for pushing something to dmaap.

2. Added sll support to A&AI rective webclient.
*Behaviour of reactive A&AI HttpClient is different as
in native spring have without it.

3. Added 10s fixed time in PRH for requesting DmaaP.

4. Added debug log in reactive/native http clients.

5. Fixed reactive workflow of prh.

6. Updated the version of:
* spring-boot-dependencies:2.0.1.RELEASE->2.0.4.RELEASE
* spring-boot-starter-reactor-netty:2.0.2.RELEASE->2.0.4.RELEASE
* spring-webflux:5.0.5.RELEASE->5.0.8.RELEASE
* reactor-bom:Bismuth-RELEASE->Bismuth-SR10

Change-Id: I815ffb5bdcf48d94f3b7c64040a73e98e404a5e8
Issue-ID: DCAEGEN2-743
Signed-off-by: pwielebs <piotr.wielebski@nokia.com>
33 files changed:
pom.xml
prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/AaiReactiveWebClient.java
prh-aai-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClient.java
prh-aai-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/AaiProducerReactiveHttpClientTest.java
prh-app-server/config/application.yaml
prh-app-server/config/prh_endpoints.json
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/SchedulerConfig.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/HttpGetClient.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PrhConfigurationProvider.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
prh-app-server/src/main/resources/application.properties
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImplTest.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiPublisherTaskSpy.java
prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImplTest.java
prh-commons/src/main/java/org/onap/dcaegen2/services/prh/model/CommonFunctions.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/DMaaPReactiveWebClient.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClient.java
prh-dmaap-client/src/main/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClient.java
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/consumer/DMaaPConsumerReactiveHttpClientTest.java
prh-dmaap-client/src/test/java/org/onap/dcaegen2/services/prh/service/producer/DMaaPProducerReactiveHttpClientTest.java

diff --git a/pom.xml b/pom.xml
index 06b0387..82cdab7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <dependency>
         <groupId>io.projectreactor</groupId>
         <artifactId>reactor-bom</artifactId>
-        <version>Bismuth-RELEASE</version>
+        <version>Bismuth-SR10</version>
         <type>pom</type>
         <scope>import</scope>
       </dependency>
index 6daf54a..55dcb39 100644 (file)
@@ -24,11 +24,18 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
 import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.SERVICE_NAME;
 import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+
 import java.util.Map;
+import javax.net.ssl.SSLException;
+
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
@@ -36,7 +43,7 @@ import reactor.core.publisher.Mono;
 
 public class AaiReactiveWebClient {
 
-    private Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(AaiReactiveWebClient.class);
 
     private String aaiUserName;
     private String aaiUserPassword;
@@ -60,8 +67,19 @@ public class AaiReactiveWebClient {
      *
      * @return WebClient
      */
-    public WebClient build() {
+    public WebClient build() throws SSLException {
+        SslContext sslContext;
+        sslContext = SslContextBuilder
+            .forClient()
+            .trustManager(InsecureTrustManagerFactory.INSTANCE)
+            .build();
+        LOGGER.debug("Setting ssl context");
+
         return WebClient.builder()
+            .clientConnector(new ReactorClientHttpConnector(clientOptions -> {
+                clientOptions.sslContext(sslContext);
+                clientOptions.disablePool();
+            }))
             .defaultHeaders(httpHeaders -> httpHeaders.setAll(aaiHeaders))
             .filter(basicAuthentication(aaiUserName, aaiUserPassword))
             .filter(logRequest())
@@ -72,9 +90,9 @@ public class AaiReactiveWebClient {
     private ExchangeFilterFunction logRequest() {
         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
             MDC.put(SERVICE_NAME, String.valueOf(clientRequest.url()));
-            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+            LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
             clientRequest.headers()
-                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+                .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
             return Mono.just(clientRequest);
         });
     }
@@ -82,7 +100,7 @@ public class AaiReactiveWebClient {
     private ExchangeFilterFunction logResponse() {
         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
             MDC.put(RESPONSE_CODE, String.valueOf(clientResponse.statusCode()));
-            logger.info("Response Status {}", clientResponse.statusCode());
+            LOGGER.info("Response Status {}", clientResponse.statusCode());
             return Mono.just(clientResponse);
         });
     }
index be6c63e..358a452 100644 (file)
 
 package org.onap.dcaegen2.services.prh.service.producer;
 
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
-import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.UUID;
 import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.AaiRequestException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
+import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.*;
+
 
 public class AaiProducerReactiveHttpClient {
 
+    private WebClient webClient;
     private final String aaiHost;
     private final String aaiProtocol;
     private final Integer aaiHostPortNumber;
     private final String aaiBasePath;
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private WebClient webClient;
-
+    private final String aaiPnfPath;
 
     /**
      * Constructor of AaiProducerReactiveHttpClient.
@@ -60,6 +57,7 @@ public class AaiProducerReactiveHttpClient {
         this.aaiProtocol = configuration.aaiProtocol();
         this.aaiHostPortNumber = configuration.aaiPort();
         this.aaiBasePath = configuration.aaiBasePath();
+        this.aaiPnfPath = configuration.aaiPnfPath();
     }
 
     /**
@@ -68,10 +66,12 @@ public class AaiProducerReactiveHttpClient {
      * @param consumerDmaapModelMono - object which will be sent to AAI database
      * @return status code of operation
      */
-    public Mono<Integer> getAaiProducerResponse(Mono<ConsumerDmaapModel> consumerDmaapModelMono) {
-        return consumerDmaapModelMono
-            .doOnNext(consumerDmaapModel -> logger.info("Sending PNF model to AAI {}", consumerDmaapModel))
-            .flatMap(this::patchAaiRequest);
+    public Mono<ClientResponse> getAaiProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+        try {
+            return patchAaiRequest(consumerDmaapModelMono);
+        } catch (URISyntaxException e) {
+            return Mono.error(e);
+        }
     }
 
     public AaiProducerReactiveHttpClient createAaiWebClient(WebClient webClient) {
@@ -79,26 +79,14 @@ public class AaiProducerReactiveHttpClient {
         return this;
     }
 
-    private Mono<Integer> patchAaiRequest(ConsumerDmaapModel dmaapModel) {
-        try {
-            return webClient.patch()
+    private Mono<ClientResponse> patchAaiRequest(ConsumerDmaapModel dmaapModel) throws URISyntaxException {
+        return
+            webClient.patch()
                 .uri(getUri(dmaapModel.getSourceName()))
                 .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
                 .header(X_INVOCATION_ID, UUID.randomUUID().toString())
-                .body(BodyInserters.fromObject(dmaapModel))
-                .retrieve()
-                .onStatus(
-                    HttpStatus::is4xxClientError,
-                    clientResponse -> Mono
-                        .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode()))
-                )
-                .onStatus(HttpStatus::is5xxServerError,
-                    clientResponse -> Mono
-                        .error(new AaiRequestException("AaiProducer HTTP " + clientResponse.statusCode())))
-                .bodyToMono(Integer.class);
-        } catch (URISyntaxException e) {
-            return Mono.error(e);
-        }
+                .body(Mono.just(createJsonBody(dmaapModel)), String.class)
+                .exchange();
     }
 
     URI getUri(String pnfName) throws URISyntaxException {
@@ -106,7 +94,7 @@ public class AaiProducerReactiveHttpClient {
             .setScheme(aaiProtocol)
             .setHost(aaiHost)
             .setPort(aaiHostPortNumber)
-            .setPath(aaiBasePath + "/" + pnfName)
+            .setPath(aaiBasePath + aaiPnfPath + "/" + pnfName)
             .build();
     }
 }
index 9b0f4fe..4160f35 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dcaegen2.services.prh.service.producer;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -31,18 +32,19 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
+
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-
 class AaiProducerReactiveHttpClientTest {
 
     private static final Integer SUCCESS_RESPONSE = 200;
@@ -50,16 +52,21 @@ class AaiProducerReactiveHttpClientTest {
     private static AaiClientConfiguration aaiConfigurationMock = mock(AaiClientConfiguration.class);
     private static WebClient webClient = mock(WebClient.class);
 
-    private static ConsumerDmaapModel dmaapModel = new ConsumerDmaapModelForUnitTest();
-    private static WebClient.RequestBodyUriSpec requestBodyUriSpec;
-    private static ResponseSpec responseSpec;
 
-    private static Map<String, String> aaiHeaders;
+    private ConsumerDmaapModel dmaapModel = spy(new ConsumerDmaapModelForUnitTest());
+    private WebClient.RequestBodyUriSpec requestBodyUriSpec;
+    private ResponseSpec responseSpec;
 
-    @BeforeAll
-    static void setUp() {
-        setupHeaders();
+    private Map<String, String> aaiHeaders;
+    private ClientResponse clientResponse;
+    private Mono<ClientResponse> clientResponseMono;
 
+    @BeforeEach
+    void setUp() {
+        setupHeaders();
+        clientResponse = mock(ClientResponse.class);
+        clientResponseMono = Mono.just(clientResponse);
+        when(dmaapModel.getSourceName()).thenReturn("NOKnhfsadhff");
         when(aaiConfigurationMock.aaiHost()).thenReturn("54.45.33.2");
         when(aaiConfigurationMock.aaiProtocol()).thenReturn("https");
         when(aaiConfigurationMock.aaiPort()).thenReturn(1234);
@@ -80,15 +87,6 @@ class AaiProducerReactiveHttpClientTest {
         responseSpec = mock(ResponseSpec.class);
     }
 
-    private static void setupHeaders() {
-        aaiHeaders = new HashMap<>();
-        aaiHeaders.put("X-FromAppId", "PRH");
-        aaiHeaders.put("X-TransactionId", "vv-temp");
-        aaiHeaders.put("Accept", "application/json");
-        aaiHeaders.put("Real-Time", "true");
-        aaiHeaders.put("Content-Type", "application/merge-patch+json");
-    }
-
     @Test
     void getAaiProducerResponse_shouldReturn200() {
         //given
@@ -98,12 +96,11 @@ class AaiProducerReactiveHttpClientTest {
         mockWebClientDependantObject();
         doReturn(expectedResult).when(responseSpec).bodyToMono(Integer.class);
         aaiProducerReactiveHttpClient.createAaiWebClient(webClient);
-        Mono<Integer> response = aaiProducerReactiveHttpClient.getAaiProducerResponse(Mono.just(dmaapModel));
 
         //then
-        StepVerifier.create(response).expectSubscription()
+        StepVerifier.create(aaiProducerReactiveHttpClient.getAaiProducerResponse(dmaapModel)).expectSubscription()
             .expectNextMatches(results -> {
-                Assertions.assertEquals(results, expectedResult.block());
+                Assertions.assertEquals(results, clientResponse);
                 return true;
             }).verifyComplete();
     }
@@ -115,24 +112,37 @@ class AaiProducerReactiveHttpClientTest {
         //when
         when(webClient.patch()).thenReturn(requestBodyUriSpec);
         aaiProducerReactiveHttpClient.createAaiWebClient(webClient);
-        when(aaiProducerReactiveHttpClient.getUri("pnfName")).thenThrow(URISyntaxException.class);
-
+        doThrow(URISyntaxException.class).when(aaiProducerReactiveHttpClient).getUri(any());
         //then
         StepVerifier.create(
             aaiProducerReactiveHttpClient.getAaiProducerResponse(
-                Mono.just(dmaapModel)
+                dmaapModel
             )).expectSubscription().expectError(Exception.class).verify();
     }
 
+    @Test
+    void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+        Assertions.assertEquals(aaiProducerReactiveHttpClient.getUri("NOKnhfsadhff"),
+            URI.create("https://54.45.33.2:1234/aai/v11/network/pnfs/pnf/NOKnhfsadhff"));
+    }
+
+
+    private void setupHeaders() {
+        aaiHeaders = new HashMap<>();
+        aaiHeaders.put("X-FromAppId", "PRH");
+        aaiHeaders.put("X-TransactionId", "vv-temp");
+        aaiHeaders.put("Accept", "application/json");
+        aaiHeaders.put("Real-Time", "true");
+        aaiHeaders.put("Content-Type", "application/merge-patch+json");
+    }
+
     private void mockWebClientDependantObject() {
         WebClient.RequestHeadersSpec requestHeadersSpec = mock(WebClient.RequestHeadersSpec.class);
         when(webClient.patch()).thenReturn(requestBodyUriSpec);
         when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
         when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
-        when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
-        doReturn(responseSpec).when(requestHeadersSpec).retrieve();
-        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+        when(requestBodyUriSpec.body(any(), (Class<Object>) any())).thenReturn(requestHeadersSpec);
+        when(requestHeadersSpec.exchange()).thenReturn(clientResponseMono);
     }
-
 }
 
index 390ea9d..2e6f54d 100644 (file)
@@ -12,8 +12,10 @@ server:
 logging:
   level:
     ROOT: ERROR
+    org.onap.dcaegen2.services.prh: INFO
+    reactor.ipc.netty.http.client: WARN
     org.springframework: ERROR
     org.springframework.data: ERROR
-    org.onap.dcaegen2.services.prh: INFO
+    org.springframework.web.reactive: WARN
 app:
   filepath: config/prh_endpoints.json
\ No newline at end of file
index e5d1c7b..b3bff7d 100644 (file)
@@ -35,6 +35,8 @@
         "aaiBasePath": "/aai/v12",
         "aaiPnfPath": "/network/pnfs/pnf",
         "aaiHeaders": {
+          "X-FromAppId": "prh",
+          "X-TransactionId": "9999",
           "Accept": "application/json",
           "Real-Time": "true",
           "Content-Type": "application/merge-patch+json"
index fc485e1..96d47e3 100644 (file)
 package org.onap.dcaegen2.services.prh;
 
 import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -44,17 +42,7 @@ public class MainApp {
     }
 
     @Bean
-    ConcurrentTaskScheduler concurrentTaskScheduler() {
+    TaskScheduler concurrentTaskScheduler() {
         return new ConcurrentTaskScheduler();
     }
-
-    @Bean
-    ThreadPoolTaskScheduler threadPoolTaskScheduler() {
-        ThreadPoolTaskScheduler threadPoolTaskScheduler
-            = new ThreadPoolTaskScheduler();
-        threadPoolTaskScheduler.setPoolSize(5);
-        threadPoolTaskScheduler.setThreadNamePrefix(
-            "CloudThreadPoolTaskScheduler");
-        return threadPoolTaskScheduler;
-    }
 }
index bc4bbf8..11c75e8 100644 (file)
@@ -36,7 +36,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import reactor.core.publisher.Flux;
 import reactor.core.scheduler.Schedulers;
 
@@ -49,7 +48,7 @@ import reactor.core.scheduler.Schedulers;
 @Primary
 public class CloudConfiguration extends AppConfig {
 
-    private Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class);
     private PrhConfigurationProvider prhConfigurationProvider;
 
     private AaiClientConfiguration aaiClientCloudConfiguration;
@@ -72,21 +71,21 @@ public class CloudConfiguration extends AppConfig {
     }
 
     private void parsingConfigError(Throwable throwable) {
-        logger.warn("Error in case of processing system environment, more details below: ", throwable);
+        LOGGER.warn("Error in case of processing system environment, more details below: ", throwable);
     }
 
     private void cloudConfigError(Throwable throwable) {
-        logger.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
+        LOGGER.warn("Exception during getting configuration from CONSUL/CONFIG_BINDING_SERVICE ", throwable);
     }
 
     private void parsingConfigSuccess(EnvProperties envProperties) {
-        logger.info("Fetching PRH configuration from ConfigBindingService/Consul");
+        LOGGER.info("Fetching PRH configuration from ConfigBindingService/Consul");
         prhConfigurationProvider.callForPrhConfiguration(envProperties)
             .subscribe(this::parseCloudConfig, this::cloudConfigError);
     }
 
     private void parseCloudConfig(JsonObject jsonObject) {
-        logger.info("Received application configuration: {}", jsonObject);
+        LOGGER.info("Received application configuration: {}", jsonObject);
         CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
         dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
         aaiClientCloudConfiguration = cloudConfigParser.getAaiClientConfig();
index d3b6cbb..fdf6847 100644 (file)
 
 package org.onap.dcaegen2.services.prh.configuration;
 
-import java.util.Optional;
-import java.util.Properties;
 import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
 import org.onap.dcaegen2.services.prh.model.EnvProperties;
 import org.onap.dcaegen2.services.prh.model.ImmutableEnvProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Optional;
+import java.util.Properties;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
@@ -35,23 +36,23 @@ import reactor.core.publisher.Flux;
 class EnvironmentProcessor {
 
     private static final int DEFAULT_CONSUL_PORT = 8500;
-    private static Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class);
 
     private EnvironmentProcessor() {
     }
 
-    static Flux<EnvProperties> evaluate(Properties systemEnvironment) {
-        logger.info("Loading configuration from system environment variables {}", systemEnvironment);
+    static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
+        LOGGER.info("Loading configuration from system environment variables");
         EnvProperties envProperties;
         try {
             envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
                 .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
                 .appName(getService(systemEnvironment)).build();
         } catch (EnvironmentLoaderException e) {
-            return Flux.error(e);
+            return Mono.error(e);
         }
-        logger.info("Evaluated environment system variables {}", envProperties);
-        return Flux.just(envProperties);
+        LOGGER.info("Evaluated environment system variables {}", envProperties);
+        return Mono.just(envProperties);
     }
 
     private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
@@ -78,8 +79,8 @@ class EnvironmentProcessor {
     }
 
     private static Integer getDefaultPortOfConsul() {
-        logger.warn("$CONSUL_PORT environment has not been defined");
-        logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+        LOGGER.warn("$CONSUL_PORT environment has not been defined");
+        LOGGER.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
         return DEFAULT_CONSUL_PORT;
     }
 }
index 2fb61c0..9257441 100644 (file)
@@ -63,7 +63,7 @@ public abstract class PrhAppConfig implements Config {
     private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
     private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
 
     AaiClientConfiguration aaiClientConfiguration;
 
@@ -114,9 +114,9 @@ public abstract class PrhAppConfig implements Config {
                     DmaapPublisherConfiguration.class);
             }
         } catch (IOException e) {
-            logger.warn("Problem with file loading, file: {}", filepath, e);
+            LOGGER.warn("Problem with file loading, file: {}", filepath, e);
         } catch (JsonSyntaxException e) {
-            logger.warn("Problem with Json deserialization", e);
+            LOGGER.warn("Problem with Json deserialization", e);
         }
     }
 
index 6132a67..214d6db 100644 (file)
 package org.onap.dcaegen2.services.prh.configuration;
 
 import io.swagger.annotations.ApiOperation;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ScheduledFuture;
-import javax.annotation.PostConstruct;
 import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import reactor.core.publisher.Mono;
 
+import javax.annotation.PostConstruct;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 6/13/18
  */
@@ -50,24 +48,22 @@ import reactor.core.publisher.Mono;
 @EnableScheduling
 public class SchedulerConfig {
 
-    private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 5;
+    private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10;
     private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
+    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfig.class);
+    private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
     private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Marker ENTRY = MarkerFactory.getMarker("ENRTY");
 
-    private final ConcurrentTaskScheduler taskScheduler;
+    private final TaskScheduler taskScheduler;
     private final ScheduledTasks scheduledTask;
-    private final TaskScheduler cloudTaskScheduler;
     private final CloudConfiguration cloudConfiguration;
 
     @Autowired
-    public SchedulerConfig(@Qualifier("concurrentTaskScheduler") ConcurrentTaskScheduler concurrentTaskScheduler,
-        ScheduledTasks scheduledTask, ThreadPoolTaskScheduler cloudTaskScheduler,
-        CloudConfiguration cloudConfiguration) {
-        this.taskScheduler = concurrentTaskScheduler;
+    public SchedulerConfig(TaskScheduler taskScheduler,
+                           ScheduledTasks scheduledTask,
+                           CloudConfiguration cloudConfiguration) {
+        this.taskScheduler = taskScheduler;
         this.scheduledTask = scheduledTask;
-        this.cloudTaskScheduler = cloudTaskScheduler;
         this.cloudConfiguration = cloudConfiguration;
     }
 
@@ -94,9 +90,9 @@ public class SchedulerConfig {
     @PostConstruct
     @ApiOperation(value = "Start task if possible")
     public synchronized boolean tryToStartTask() {
-        logger.info(ENTRY,"Start scheduling PRH workflow");
+        LOGGER.info(ENTRY, "Start scheduling PRH workflow");
         if (scheduledPrhTaskFutureList.isEmpty()) {
-            scheduledPrhTaskFutureList.add(cloudTaskScheduler
+            scheduledPrhTaskFutureList.add(taskScheduler
                 .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
                     Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
             scheduledPrhTaskFutureList.add(taskScheduler
index 573724d..1b2f4a1 100644 (file)
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
 @Api(value = "HeartbeatController", description = "Check liveness of PRH service")
 public class HeartbeatController {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
 
     /**
      * Endpoint for checking that PRH is alive.
@@ -57,7 +57,7 @@ public class HeartbeatController {
     }
     )
     public Mono<ResponseEntity<String>> heartbeat() {
-        logger.trace("Receiving heartbeat request");
+        LOGGER.trace("Receiving heartbeat request");
         return Mono.defer(() ->
             Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
         );
index 270fa58..9386b9e 100644 (file)
@@ -40,7 +40,7 @@ import reactor.core.publisher.Mono;
 @Api(value = "ScheduleController", description = "Schedule Controller")
 public class ScheduleController {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduleController.class);
 
     private final SchedulerConfig schedulerConfig;
 
@@ -52,14 +52,14 @@ public class ScheduleController {
     @RequestMapping(value = "start", method = RequestMethod.GET)
     @ApiOperation(value = "Start scheduling worker request")
     public Mono<ResponseEntity<String>> startTasks() {
-        logger.trace("Receiving start scheduling worker request");
+        LOGGER.trace("Receiving start scheduling worker request");
         return Mono.fromSupplier(schedulerConfig::tryToStartTask).map(this::createStartTaskResponse);
     }
 
     @RequestMapping(value = "stopPrh", method = RequestMethod.GET)
     @ApiOperation(value = "Receiving stop scheduling worker request")
     public Mono<ResponseEntity<String>> stopTask() {
-        logger.trace("Receiving stop scheduling worker request");
+        LOGGER.trace("Receiving stop scheduling worker request");
         return schedulerConfig.getResponseFromCancellationOfTasks();
     }
 
index 8742d87..a5ecc1d 100644 (file)
@@ -23,17 +23,16 @@ package org.onap.dcaegen2.services.prh.service;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.util.StringUtils;
 import reactor.core.publisher.Mono;
 
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
  */
@@ -46,8 +45,6 @@ public class DmaapConsumerJsonParser {
     private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
     private static final String SOURCE_NAME = "sourceName";
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
     /**
      * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
      *
@@ -56,19 +53,18 @@ public class DmaapConsumerJsonParser {
      */
     public Mono<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
         return monoMessage
-            .doOnNext(message -> logger.info("Consumed message from DmaaP: {}", message))
             .flatMap(this::getJsonParserMessage)
             .flatMap(this::createJsonConsumerModel);
     }
 
     private Mono<JsonElement> getJsonParserMessage(String message) {
         return StringUtils.isEmpty(message) ? Mono.error(new DmaapEmptyResponseException())
-            : Mono.fromSupplier(() -> new JsonParser().parse(message));
+            : Mono.fromCallable(() -> new JsonParser().parse(message));
     }
 
     private Mono<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
         return jsonElement.isJsonObject()
-            ? create(Mono.fromSupplier(jsonElement::getAsJsonObject))
+            ? create(Mono.fromCallable(jsonElement::getAsJsonObject))
             : getConsumerDmaapModelFromJsonArray(jsonElement);
     }
 
index 56ab484..4f66e25 100644 (file)
@@ -30,9 +30,9 @@ import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
-public class HttpGetClient {
+class HttpGetClient {
 
-    private static final Logger logger = LoggerFactory.getLogger(HttpGetClient.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(HttpGetClient.class);
 
     private final WebClient webClient;
     private final Gson gson;
@@ -41,12 +41,12 @@ public class HttpGetClient {
         this(WebClient.builder().filter(logRequest()).filter(logResponse()).build());
     }
 
-    HttpGetClient(WebClient webClient){
+    HttpGetClient(WebClient webClient) {
         this.webClient = webClient;
         this.gson = new Gson();
     }
 
-    public <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
+    <T> Mono<T> callHttpGet(String url, Class<T> tClass) {
         return webClient
             .get()
             .uri(url)
@@ -54,7 +54,7 @@ public class HttpGetClient {
             .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(getException(response)))
             .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(getException(response)))
             .bodyToMono(String.class)
-            .flatMap(body->getJsonFromRequest(body,tClass));
+            .flatMap(body -> getJsonFromRequest(body, tClass));
     }
 
     private RuntimeException getException(ClientResponse response) {
@@ -66,27 +66,26 @@ public class HttpGetClient {
         try {
             return Mono.just(parseJson(body, tClass));
         } catch (JsonSyntaxException | IllegalStateException e) {
-            logger.warn("Converting string to json threw error ", e);
             return Mono.error(e);
         }
     }
 
-    private <T> T  parseJson(String body, Class<T> tClass){
+    private <T> T parseJson(String body, Class<T> tClass) {
         return gson.fromJson(body, tClass);
     }
 
     private static ExchangeFilterFunction logResponse() {
         return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
-            logger.info("Response status {}", clientResponse.statusCode());
+            LOGGER.info("Response status {}", clientResponse.statusCode());
             return Mono.just(clientResponse);
         });
     }
 
     private static ExchangeFilterFunction logRequest() {
         return ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
-            logger.info("Request: {} {}", clientRequest.method(), clientRequest.url());
+            LOGGER.info("Request: {} {}", clientRequest.method(), clientRequest.url());
             clientRequest.headers()
-                .forEach((name, values) -> values.forEach(value -> logger.info("{}={}", name, value)));
+                .forEach((name, values) -> values.forEach(value -> LOGGER.info("{}={}", name, value)));
             return Mono.just(clientRequest);
         });
     }
index 7af4a7c..b346bf5 100644 (file)
@@ -38,7 +38,7 @@ import java.net.URISyntaxException;
 @Service
 public class PrhConfigurationProvider {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger LOGGER = LoggerFactory.getLogger(PrhConfigurationProvider.class);
 
     private final HttpGetClient httpGetClient;
 
@@ -56,12 +56,12 @@ public class PrhConfigurationProvider {
     }
 
     private Mono<String> callConsulForConfigBindingServiceEndpoint(EnvProperties envProperties) {
-        logger.info("Retrieving Config Binding Service endpoint from Consul");
+        LOGGER.info("Retrieving Config Binding Service endpoint from Consul");
         try {
             return httpGetClient.callHttpGet(getConsulUrl(envProperties), JsonArray.class)
                 .flatMap(jsonArray -> this.createConfigBindingServiceURL(jsonArray, envProperties.appName()));
         } catch (URISyntaxException e) {
-            logger.warn("Malformed Consul uri", e);
+            LOGGER.warn("Malformed Consul uri", e);
             return Mono.error(e);
         }
     }
@@ -72,7 +72,7 @@ public class PrhConfigurationProvider {
     }
 
     private Mono<JsonObject> callConfigBindingServiceForPrhConfiguration(String configBindingServiceUri) {
-        logger.info("Retrieving PRH configuration");
+        LOGGER.info("Retrieving PRH configuration");
         return httpGetClient.callHttpGet(configBindingServiceUri, JsonObject.class);
     }
 
@@ -86,7 +86,7 @@ public class PrhConfigurationProvider {
             return Mono.just(getUri(jsonObject.get("ServiceAddress").getAsString(),
                 jsonObject.get("ServicePort").getAsInt(), "/service_component", appName));
         } catch (URISyntaxException e) {
-            logger.warn("Malformed Config Binding Service uri", e);
+            LOGGER.warn("Malformed Config Binding Service uri", e);
             return Mono.error(e);
         }
     }
@@ -99,7 +99,7 @@ public class PrhConfigurationProvider {
                 throw new IllegalStateException("JSON Array was empty");
             }
         } catch (IllegalStateException e) {
-            logger.warn("Failed to retrieve JSON Object from array", e);
+            LOGGER.warn("Failed to retrieve JSON Object from array", e);
             return Mono.error(e);
         }
     }
index f58fed6..5a05d37 100644 (file)
@@ -29,21 +29,23 @@ import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpCl
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
 
+import javax.net.ssl.SSLException;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
 public abstract class AaiProducerTask {
 
-    abstract Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> message) throws AaiNotFoundException;
+    abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
 
-    abstract AaiProducerReactiveHttpClient resolveClient();
+    abstract AaiProducerReactiveHttpClient resolveClient() throws SSLException;
 
     protected abstract AaiClientConfiguration resolveConfiguration();
 
-    protected abstract Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel)
-        throws PrhTaskException;
+    protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+        throws PrhTaskException, SSLException;
 
-    WebClient buildWebClient() {
+    WebClient buildWebClient() throws SSLException {
         return new AaiReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
     }
 }
index f5b8307..7ccf75a 100644 (file)
@@ -36,6 +36,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
 
+import javax.net.ssl.SSLException;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
@@ -43,9 +45,8 @@ import reactor.core.publisher.Mono;
 public class AaiProducerTaskImpl extends
     AaiProducerTask {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
-
+    private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+    private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
 
     private final Config config;
     private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
@@ -56,12 +57,12 @@ public class AaiProducerTaskImpl extends
     }
 
     @Override
-    Mono<ConsumerDmaapModel> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
-
+    Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
+        LOGGER.info("Publish to AAI DmaapModel");
         return aaiProducerReactiveHttpClient.getAaiProducerResponse(consumerDmaapModel)
             .flatMap(response -> {
-                if (HttpUtils.isSuccessfulResponseCode(response)) {
-                    return consumerDmaapModel;
+                if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
+                    return Mono.just(consumerDmaapModel);
                 }
                 return Mono
                     .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
@@ -69,8 +70,8 @@ public class AaiProducerTaskImpl extends
     }
 
     @Override
-    AaiProducerReactiveHttpClient resolveClient() {
-        return new AaiProducerReactiveHttpClient(resolveConfiguration());
+    AaiProducerReactiveHttpClient resolveClient() throws SSLException {
+        return new AaiProducerReactiveHttpClient(resolveConfiguration()).createAaiWebClient(buildWebClient());
     }
 
     @Override
@@ -79,12 +80,13 @@ public class AaiProducerTaskImpl extends
     }
 
     @Override
-    protected Mono<ConsumerDmaapModel> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException {
+    protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
+        throws PrhTaskException, SSLException {
         if (consumerDmaapModel == null) {
             throw new DmaapNotFoundException("Invoked null object to DMaaP task");
         }
         aaiProducerReactiveHttpClient = resolveClient();
-        logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+        LOGGER.debug(INVOKE, "Method called with arg {}", consumerDmaapModel);
         return publish(consumerDmaapModel);
 
     }
index a912ca9..d322a43 100644 (file)
@@ -21,7 +21,6 @@
 package org.onap.dcaegen2.services.prh.tasks;
 
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
 import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
@@ -33,7 +32,7 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapConsumerTask {
 
-    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message) throws PrhTaskException;
+    abstract Mono<ConsumerDmaapModel> consume(Mono<String> message);
 
     abstract DMaaPConsumerReactiveHttpClient resolveClient();
 
@@ -41,7 +40,7 @@ abstract class DmaapConsumerTask {
 
     protected abstract DmaapConsumerConfiguration resolveConfiguration();
 
-    protected abstract Mono<ConsumerDmaapModel> execute(String object) throws PrhTaskException;
+    protected abstract Mono<ConsumerDmaapModel> execute(String object);
 
     WebClient buildWebClient() {
         return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
index 9e1fadf..0d4be08 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import java.util.Map;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
 import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
 import org.onap.dcaegen2.services.prh.service.consumer.DMaaPConsumerReactiveHttpClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -43,11 +40,11 @@ import reactor.core.publisher.Mono;
 @Component
 public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+    private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
     private final Config config;
     private DmaapConsumerJsonParser dmaapConsumerJsonParser;
-    private DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient;
 
     @Autowired
     public DmaapConsumerTaskImpl(Config config) {
@@ -67,8 +64,8 @@ public class DmaapConsumerTaskImpl extends DmaapConsumerTask {
 
     @Override
     public Mono<ConsumerDmaapModel> execute(String object) {
-        dmaaPConsumerReactiveHttpClient = resolveClient();
-        logger.info(INVOKE, "Method called with arg {}", object);
+        DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
+        LOGGER.debug(INVOKE, "Method called with arg {}", object);
         return consume(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
     }
 
index 9a5813d..7a121d5 100644 (file)
@@ -23,9 +23,9 @@ package org.onap.dcaegen2.services.prh.tasks;
 import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.service.DMaaPReactiveWebClient;
 import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
 import reactor.core.publisher.Mono;
 
 /**
@@ -33,15 +33,14 @@ import reactor.core.publisher.Mono;
  */
 abstract class DmaapPublisherTask {
 
-    abstract Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+    abstract Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
 
     abstract DMaaPProducerReactiveHttpClient resolveClient();
 
     protected abstract DmaapPublisherConfiguration resolveConfiguration();
 
-    protected abstract Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws PrhTaskException;
+    protected abstract Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel)
+        throws PrhTaskException;
 
-    WebClient buildWebClient() {
-        return new DMaaPReactiveWebClient().fromConfiguration(resolveConfiguration()).build();
-    }
+    abstract RestTemplate buildWebClient();
 }
index 7326038..733b865 100644 (file)
@@ -30,7 +30,9 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
 import reactor.core.publisher.Mono;
 
 /**
@@ -39,8 +41,8 @@ import reactor.core.publisher.Mono;
 @Component
 public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-    private final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+    private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+    private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
     private final Config config;
     private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
 
@@ -50,24 +52,25 @@ public class DmaapPublisherTaskImpl extends DmaapPublisherTask {
     }
 
     @Override
-    Mono<String> publish(Mono<ConsumerDmaapModel> consumerDmaapModel) {
-        return consumerDmaapModel.flatMap(dmaapModel -> {
-            logger.info("Publishing on DMaaP topic {} object {}", resolveConfiguration().dmaapTopicName(),
-                dmaapModel);
-            return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(dmaapModel);
-        });
+    Mono<ResponseEntity<String>> publish(ConsumerDmaapModel consumerDmaapModel) {
+        return dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
     }
 
     @Override
-    public Mono<String> execute(Mono<ConsumerDmaapModel> consumerDmaapModel) throws DmaapNotFoundException {
+    public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
         if (consumerDmaapModel == null) {
             throw new DmaapNotFoundException("Invoked null object to DMaaP task");
         }
         dmaapProducerReactiveHttpClient = resolveClient();
-        logger.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
+        LOGGER.info(INVOKE, "Method called with arg {}", consumerDmaapModel);
         return publish(consumerDmaapModel);
     }
 
+    @Override
+    RestTemplate buildWebClient() {
+        return new RestTemplate();
+    }
+
     @Override
     protected DmaapPublisherConfiguration resolveConfiguration() {
         return config.getDmaapPublisherConfiguration();
index 6432a33..f74bc56 100644 (file)
@@ -25,7 +25,8 @@ import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.RESPONSE
 
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.net.ssl.SSLException;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -33,12 +34,10 @@ import org.onap.dcaegen2.services.prh.model.logging.MDCVariables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.slf4j.Marker;
-import org.slf4j.MarkerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -46,7 +45,8 @@ import reactor.core.scheduler.Schedulers;
 @Component
 public class ScheduledTasks {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+
     private final DmaapConsumerTask dmaapConsumerTask;
     private final DmaapPublisherTask dmaapProducerTask;
     private final AaiProducerTask aaiProducerTask;
@@ -72,24 +72,33 @@ public class ScheduledTasks {
      */
     public void scheduleMainPrhEventTask() {
         MDCVariables.setMdcContextMap(contextMap);
-        logger.trace("Execution of tasks was registered");
-
-        Mono<String> dmaapProducerResponse = Mono.fromCallable(consumeFromDMaaPMessage())
-            .doOnError(DmaapEmptyResponseException.class, error -> logger.warn("Nothing to consume from DMaaP"))
-            .map(this::publishToAaiConfiguration)
-            .flatMap(this::publishToDmaapConfiguration)
-            .subscribeOn(Schedulers.elastic());
+        try {
+            logger.trace("Execution of tasks was registered");
+            CountDownLatch mainCountDownLatch = new CountDownLatch(1);
+            consumeFromDMaaPMessage()
+                .doOnError(DmaapEmptyResponseException.class, error ->
+                    logger.warn("Nothing to consume from DMaaP")
+                )
+                .flatMap(this::publishToAaiConfiguration)
+                .flatMap(this::publishToDmaapConfiguration)
+                .doOnTerminate(mainCountDownLatch::countDown)
+                .subscribe(this::onSuccess, this::onError, this::onComplete);
 
-        dmaapProducerResponse.subscribe(this::onSuccess, this::onError, this::onComplete);
+            mainCountDownLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
+
     private void onComplete() {
         logger.info("PRH tasks have been completed");
     }
 
-    private void onSuccess(String responseCode) {
-        MDC.put(RESPONSE_CODE, responseCode);
-        logger.info("Prh consumed tasks. HTTP Response code {}", responseCode);
+    private void onSuccess(ResponseEntity<String> responseCode) {
+        MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
+        logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+            responseCode.getStatusCode().value());
     }
 
     private void onError(Throwable throwable) {
@@ -98,24 +107,26 @@ public class ScheduledTasks {
         }
     }
 
-    private Callable<Mono<ConsumerDmaapModel>> consumeFromDMaaPMessage() {
-        return () -> {
+
+    private Mono<ConsumerDmaapModel> consumeFromDMaaPMessage() {
+        return Mono.defer(() -> {
             MDCVariables.setMdcContextMap(contextMap);
             MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
+            logger.info("Init configs");
             dmaapConsumerTask.initConfigs();
             return dmaapConsumerTask.execute("");
-        };
+        });
     }
 
-    private Mono<ConsumerDmaapModel> publishToAaiConfiguration(Mono<ConsumerDmaapModel> monoDMaaPModel) {
+    private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
         try {
             return aaiProducerTask.execute(monoDMaaPModel);
-        } catch (PrhTaskException e) {
+        } catch (PrhTaskException | SSLException e) {
             return Mono.error(e);
         }
     }
 
-    private Mono<String> publishToDmaapConfiguration(Mono<ConsumerDmaapModel> monoAaiModel) {
+    private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
         try {
             return dmaapProducerTask.execute(monoAaiModel);
         } catch (PrhTaskException e) {
index ac0192c..e343a36 100644 (file)
@@ -9,6 +9,9 @@ logging.level.root=ERROR
 logging.level.org.springframework=ERROR
 logging.level.org.springframework.data=ERROR
 logging.level.org.onap.dcaegen2.services.prh=INFO
+logging.level.org.springframework.web.reactive=WARN
+logging.level.reactor.ipc.netty.http.client=WARN
 app.filepath=config/prh_endpoints.json
 app.xonaprequestid=requestID
 app.xinvocationid=invocationID
+
index 5425939..f5cc6b2 100644 (file)
@@ -29,8 +29,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import javax.net.ssl.SSLException;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
@@ -40,6 +41,8 @@ import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -56,14 +59,16 @@ class AaiProducerTaskImplTest {
     private static final String BASE_PATH = "/aai/v11";
     private static final String PNF_PATH = "/network/pnfs/pnf";
 
-    private static ConsumerDmaapModel consumerDmaapModel;
-    private static AaiProducerTaskImpl aaiProducerTask;
-    private static AaiClientConfiguration aaiClientConfiguration;
-    private static AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
-    private static AppConfig appConfig;
+    private ConsumerDmaapModel consumerDmaapModel;
+    private AaiProducerTaskImpl aaiProducerTask;
+    private AaiClientConfiguration aaiClientConfiguration;
+    private AaiProducerReactiveHttpClient aaiProducerReactiveHttpClient;
+    private AppConfig appConfig;
+    private ClientResponse clientResponse;
 
-    @BeforeAll
-    static void setUp() {
+    @BeforeEach
+    void setUp() {
+        clientResponse = mock(ClientResponse.class);
         aaiClientConfiguration = new ImmutableAaiClientConfiguration.Builder()
             .aaiHost(AAI_HOST)
             .aaiPort(PORT)
@@ -81,17 +86,6 @@ class AaiProducerTaskImplTest {
 
     }
 
-    private static void getAaiProducerTask_whenMockingResponseObject(Integer statusCode) {
-        //given
-        aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
-        when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any()))
-            .thenReturn(Mono.just(statusCode));
-        when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
-        aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
-        when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration);
-        doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
-    }
-
     @Test
     void whenPassedObjectDoesntFit_ThrowsPrhTaskException() {
         //given/when/
@@ -105,10 +99,10 @@ class AaiProducerTaskImplTest {
     }
 
     @Test
-    void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException {
+    void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException, SSLException {
         //given/when
         getAaiProducerTask_whenMockingResponseObject(200);
-        Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(Mono.just(consumerDmaapModel));
+        Mono<ConsumerDmaapModel> response = aaiProducerTask.execute(consumerDmaapModel);
 
         //then
         verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
@@ -118,13 +112,26 @@ class AaiProducerTaskImplTest {
     }
 
     @Test
-    void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException {
+    void whenPassedObjectFits_butIncorrectResponseReturns() throws PrhTaskException, SSLException {
         //given/when
         getAaiProducerTask_whenMockingResponseObject(400);
-        StepVerifier.create(aaiProducerTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
+        StepVerifier.create(aaiProducerTask.execute(consumerDmaapModel)).expectSubscription()
             .expectError(PrhTaskException.class).verify();
         //then
         verify(aaiProducerReactiveHttpClient, times(1)).getAaiProducerResponse(any());
         verifyNoMoreInteractions(aaiProducerReactiveHttpClient);
     }
+
+    private void getAaiProducerTask_whenMockingResponseObject(int statusCode) throws SSLException {
+        //given
+        doReturn(HttpStatus.valueOf(statusCode)).when(clientResponse).statusCode();
+        Mono<ClientResponse> clientResponseMono = Mono.just(clientResponse);
+        aaiProducerReactiveHttpClient = mock(AaiProducerReactiveHttpClient.class);
+        when(aaiProducerReactiveHttpClient.getAaiProducerResponse(any()))
+            .thenReturn(clientResponseMono);
+        when(appConfig.getAaiClientConfiguration()).thenReturn(aaiClientConfiguration);
+        aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
+        when(aaiProducerTask.resolveConfiguration()).thenReturn(aaiClientConfiguration);
+        doReturn(aaiProducerReactiveHttpClient).when(aaiProducerTask).resolveClient();
+    }
 }
\ No newline at end of file
index 82dcdae..231bf14 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
 import org.onap.dcaegen2.services.prh.config.AaiClientConfiguration;
 import org.onap.dcaegen2.services.prh.configuration.AppConfig;
 import org.onap.dcaegen2.services.prh.service.producer.AaiProducerReactiveHttpClient;
@@ -31,6 +27,10 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Primary;
 
+import javax.net.ssl.SSLException;
+
+import static org.mockito.Mockito.*;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
  */
@@ -44,7 +44,7 @@ public class AaiPublisherTaskSpy {
      */
     @Bean
     @Primary
-    public AaiProducerTask registerSimpleAaiPublisherTask() {
+    public AaiProducerTask registerSimpleAaiPublisherTask() throws SSLException {
         AppConfig appConfig = spy(AppConfig.class);
         doReturn(mock(AaiClientConfiguration.class)).when(appConfig).getAaiClientConfiguration();
         AaiProducerTaskImpl aaiProducerTask = spy(new AaiProducerTaskImpl(appConfig));
index 453679d..ae7b8e7 100644 (file)
 
 package org.onap.dcaegen2.services.prh.tasks;
 
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
@@ -42,9 +32,14 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.service.producer.DMaaPProducerReactiveHttpClient;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/17/18
  */
@@ -84,15 +79,16 @@ class DmaapPublisherTaskImplTest {
     @Test
     void whenPassedObjectFits_ReturnsCorrectStatus() throws PrhTaskException {
         //given
-        prepareMocksForTests(HttpStatus.OK.value());
+        ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.OK.value());
 
         //when
-        StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
-            .expectNext(HttpStatus.OK.toString()).verifyComplete();
+        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.OK);
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
+            .expectNext(responseEntity).verifyComplete();
 
         //then
         verify(dMaaPProducerReactiveHttpClient, times(1))
-            .getDMaaPProducerResponse(any());
+            .getDMaaPProducerResponse(consumerDmaapModel);
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
@@ -100,24 +96,30 @@ class DmaapPublisherTaskImplTest {
     @Test
     void whenPassedObjectFits_butIncorrectResponseReturns() throws DmaapNotFoundException {
         //given
-        prepareMocksForTests(HttpStatus.UNAUTHORIZED.value());
+        ResponseEntity<String> responseEntity = prepareMocksForTests(HttpStatus.UNAUTHORIZED.value());
 
         //when
-        StepVerifier.create(dmaapPublisherTask.execute(Mono.just(consumerDmaapModel))).expectSubscription()
-            .expectNext(String.valueOf(HttpStatus.UNAUTHORIZED.value())).verifyComplete();
+        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.UNAUTHORIZED);
+        StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel)).expectSubscription()
+            .expectNext(responseEntity).verifyComplete();
 
         //then
-        verify(dMaaPProducerReactiveHttpClient, times(1)).getDMaaPProducerResponse(any());
+        verify(dMaaPProducerReactiveHttpClient, times(1))
+            .getDMaaPProducerResponse(consumerDmaapModel);
         verifyNoMoreInteractions(dMaaPProducerReactiveHttpClient);
     }
 
 
-    private void prepareMocksForTests(Integer httpResponseCode) {
+    private ResponseEntity<String> prepareMocksForTests(Integer httpResponseCode) {
+        ResponseEntity<String> responseEntity = mock(ResponseEntity.class);
+        //when
+        when(responseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(httpResponseCode));
         dMaaPProducerReactiveHttpClient = mock(DMaaPProducerReactiveHttpClient.class);
         when(dMaaPProducerReactiveHttpClient.getDMaaPProducerResponse(any()))
-            .thenReturn(Mono.just(httpResponseCode.toString()));
+            .thenReturn(Mono.just(responseEntity));
         dmaapPublisherTask = spy(new DmaapPublisherTaskImpl(appConfig));
         when(dmaapPublisherTask.resolveConfiguration()).thenReturn(dmaapPublisherConfiguration);
         doReturn(dMaaPProducerReactiveHttpClient).when(dmaapPublisherTask).resolveClient();
+        return responseEntity;
     }
 }
\ No newline at end of file
index 145d917..83a078d 100644 (file)
@@ -42,4 +42,4 @@ public class CommonFunctions {
         return gsonBuilder.create().toJson(ImmutableConsumerDmaapModel.builder().ipv4(consumerDmaapModel.getIpv4())
             .ipv6(consumerDmaapModel.getIpv6()).sourceName(consumerDmaapModel.getSourceName()).build());
     }
-}
+}
\ No newline at end of file
index 8ce8175..4327dfb 100644 (file)
 
 package org.onap.dcaegen2.services.prh.service;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.onap.dcaegen2.services.prh.config.DmaapCustomConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.springframework.http.HttpHeaders;
 import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
@@ -40,7 +37,6 @@ public class DMaaPReactiveWebClient {
 
     private String dmaaPUserName;
     private String dmaaPUserPassword;
-    private String dmaaPContentType;
 
     /**
      * Creating DMaaPReactiveWebClient passing to them basic DMaaPConfig.
@@ -51,8 +47,6 @@ public class DMaaPReactiveWebClient {
     public DMaaPReactiveWebClient fromConfiguration(DmaapCustomConfig dmaapCustomConfig) {
         this.dmaaPUserName = dmaapCustomConfig.dmaapUserName();
         this.dmaaPUserPassword = dmaapCustomConfig.dmaapUserPassword();
-        this.dmaaPContentType = dmaapCustomConfig.dmaapContentType();
-
         return this;
     }
 
@@ -63,7 +57,6 @@ public class DMaaPReactiveWebClient {
      */
     public WebClient build() {
         return WebClient.builder()
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaaPContentType)
             .filter(logRequest())
             .filter(logResponse())
             .build();
index ac13dd6..f9a6637 100644 (file)
@@ -29,9 +29,8 @@ import java.net.URISyntaxException;
 import java.util.UUID;
 import org.apache.http.client.utils.URIBuilder;
 import org.onap.dcaegen2.services.prh.config.DmaapConsumerConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.reactive.function.client.WebClient;
 import reactor.core.publisher.Mono;
@@ -41,13 +40,13 @@ import reactor.core.publisher.Mono;
  */
 public class DMaaPConsumerReactiveHttpClient {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
     private final String dmaapHostName;
     private final String dmaapProtocol;
     private final Integer dmaapPortNumber;
     private final String dmaapTopicName;
     private final String consumerGroup;
     private final String consumerId;
+    private final String contentType;
     private WebClient webClient;
 
     /**
@@ -62,6 +61,7 @@ public class DMaaPConsumerReactiveHttpClient {
         this.dmaapTopicName = consumerConfiguration.dmaapTopicName();
         this.consumerGroup = consumerConfiguration.consumerGroup();
         this.consumerId = consumerConfiguration.consumerId();
+        this.contentType = consumerConfiguration.dmaapContentType();
     }
 
     /**
@@ -76,15 +76,15 @@ public class DMaaPConsumerReactiveHttpClient {
                 .uri(getUri())
                 .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
                 .header(X_INVOCATION_ID, UUID.randomUUID().toString())
+                .header(HttpHeaders.CONTENT_TYPE, contentType)
                 .retrieve()
                 .onStatus(HttpStatus::is4xxClientError, clientResponse ->
-                    Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode()))
+                    Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode()))
                 )
                 .onStatus(HttpStatus::is5xxServerError, clientResponse ->
-                    Mono.error(new Exception("DmaaPConsumer HTTP " + clientResponse.statusCode())))
+                    Mono.error(new RuntimeException("DmaaPConsumer HTTP " + clientResponse.statusCode())))
                 .bodyToMono(String.class);
         } catch (URISyntaxException e) {
-            logger.warn("Exception while evaluating URI ");
             return Mono.error(e);
         }
     }
index d049d38..5c72b38 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.onap.dcaegen2.services.prh.service.producer;
 
+import static org.onap.dcaegen2.services.prh.model.CommonFunctions.createJsonBody;
 import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.REQUEST_ID;
 import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_INVOCATION_ID;
 import static org.onap.dcaegen2.services.prh.model.logging.MDCVariables.X_ONAP_REQUEST_ID;
@@ -33,9 +34,11 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
 import reactor.core.publisher.Mono;
 
 /**
@@ -44,11 +47,13 @@ import reactor.core.publisher.Mono;
 public class DMaaPProducerReactiveHttpClient {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
     private final String dmaapHostName;
     private final Integer dmaapPortNumber;
     private final String dmaapProtocol;
     private final String dmaapTopicName;
-    private WebClient webClient;
+    private final String dmaapContentType;
+    private RestTemplate restTemplate;
 
     /**
      * Constructor DMaaPProducerReactiveHttpClient.
@@ -60,6 +65,7 @@ public class DMaaPProducerReactiveHttpClient {
         this.dmaapProtocol = dmaapPublisherConfiguration.dmaapProtocol();
         this.dmaapPortNumber = dmaapPublisherConfiguration.dmaapPortNumber();
         this.dmaapTopicName = dmaapPublisherConfiguration.dmaapTopicName();
+        this.dmaapContentType = dmaapPublisherConfiguration.dmaapContentType();
     }
 
     /**
@@ -68,29 +74,30 @@ public class DMaaPProducerReactiveHttpClient {
      * @param consumerDmaapModelMono - object which will be sent to DMaaP
      * @return status code of operation
      */
-    public Mono<String> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
-        try {
-            return webClient
-                .post()
-                .uri(getUri())
-                .header(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID))
-                .header(X_INVOCATION_ID, UUID.randomUUID().toString())
-                .body(BodyInserters.fromObject(consumerDmaapModelMono))
-                .retrieve()
-                .onStatus(HttpStatus::is4xxClientError, clientResponse ->
-                    Mono.error(new Exception("DmaapProducer HTTP" + clientResponse.statusCode()))
-                )
-                .onStatus(HttpStatus::is5xxServerError, clientResponse ->
-                    Mono.error(new Exception("DmaapProducer HTTP " + clientResponse.statusCode())))
-                .bodyToMono(String.class);
-        } catch (URISyntaxException e) {
-            logger.warn("Exception while evaluating URI");
-            return Mono.error(e);
-        }
+
+    public Mono<ResponseEntity<String>> getDMaaPProducerResponse(ConsumerDmaapModel consumerDmaapModelMono) {
+        return Mono.defer(() -> {
+            try {
+                HttpEntity<String> request = new HttpEntity<>(createJsonBody(consumerDmaapModelMono), getAllHeaders());
+                return Mono.just(restTemplate.exchange(getUri(), HttpMethod.POST, request, String.class));
+            } catch (URISyntaxException e) {
+                logger.warn("Exception while evaluating URI");
+                return Mono.error(e);
+            }
+        });
+    }
+
+    private HttpHeaders getAllHeaders() {
+        HttpHeaders headers = new HttpHeaders();
+        headers.set(X_ONAP_REQUEST_ID, MDC.get(REQUEST_ID));
+        headers.set(X_INVOCATION_ID, UUID.randomUUID().toString());
+        headers.set(HttpHeaders.CONTENT_TYPE, dmaapContentType);
+        return headers;
+
     }
 
-    public DMaaPProducerReactiveHttpClient createDMaaPWebClient(WebClient webClient) {
-        this.webClient = webClient;
+    public DMaaPProducerReactiveHttpClient createDMaaPWebClient(RestTemplate restTemplate) {
+        this.restTemplate = restTemplate;
         return this;
     }
 
index 1a23756..9f69370 100644 (file)
@@ -29,6 +29,7 @@ import static org.springframework.web.reactive.function.client.ExchangeFilterFun
 
 import java.net.URI;
 import java.net.URISyntaxException;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -110,6 +111,12 @@ class DMaaPConsumerReactiveHttpClientTest {
             .expectError(Exception.class).verify();
     }
 
+    @Test
+    void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+        Assertions.assertEquals(dmaapConsumerReactiveHttpClient.getUri(),
+            URI.create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"));
+    }
+
     private void mockDependantObjects() {
         when(webClient.get()).thenReturn(requestHeadersSpec);
         when(requestHeadersSpec.uri((URI) any())).thenReturn(requestHeadersSpec);
index e8af8cd..05b7489 100644 (file)
@@ -25,27 +25,27 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.onap.dcaegen2.services.prh.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModelForUnitTest;
-import org.springframework.http.HttpHeaders;
-import org.springframework.web.reactive.function.client.WebClient;
-import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
-import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
-import reactor.core.publisher.Mono;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
 import reactor.test.StepVerifier;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 7/4/18
  */
+
 class DMaaPProducerReactiveHttpClientTest {
 
     private DMaaPProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
@@ -53,9 +53,6 @@ class DMaaPProducerReactiveHttpClientTest {
     private DmaapPublisherConfiguration dmaapPublisherConfigurationMock = mock(
         DmaapPublisherConfiguration.class);
     private ConsumerDmaapModel consumerDmaapModel = new ConsumerDmaapModelForUnitTest();
-    private WebClient webClient = mock(WebClient.class);
-    private RequestBodyUriSpec requestBodyUriSpec;
-    private ResponseSpec responseSpec;
 
 
     @BeforeEach
@@ -66,33 +63,26 @@ class DMaaPProducerReactiveHttpClientTest {
         when(dmaapPublisherConfigurationMock.dmaapUserName()).thenReturn("PRH");
         when(dmaapPublisherConfigurationMock.dmaapUserPassword()).thenReturn("PRH");
         when(dmaapPublisherConfigurationMock.dmaapContentType()).thenReturn("application/json");
-        when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("pnfReady");
-
+        when(dmaapPublisherConfigurationMock.dmaapTopicName()).thenReturn("unauthenticated.PNF_READY");
         dmaapProducerReactiveHttpClient = new DMaaPProducerReactiveHttpClient(dmaapPublisherConfigurationMock);
 
-        webClient = spy(WebClient.builder()
-            .defaultHeader(HttpHeaders.CONTENT_TYPE, dmaapPublisherConfigurationMock.dmaapContentType())
-            .filter(basicAuthentication(dmaapPublisherConfigurationMock.dmaapUserName(),
-                dmaapPublisherConfigurationMock.dmaapUserPassword()))
-            .build());
-        requestBodyUriSpec = mock(RequestBodyUriSpec.class);
-        responseSpec = mock(ResponseSpec.class);
     }
 
     @Test
     void getHttpResponse_Success() {
         //given
-        Integer responseSuccess = 200;
-        Mono<Integer> expectedResult = Mono.just(responseSuccess);
-
+        int responseSuccess = 200;
+        ResponseEntity<String> mockedResponseEntity = mock(ResponseEntity.class);
+        RestTemplate restTemplate = mock(RestTemplate.class);
         //when
-        mockWebClientDependantObject();
-        doReturn(expectedResult).when(responseSpec).bodyToMono(String.class);
-        dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
-        Mono<String> response = dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
+        when(mockedResponseEntity.getStatusCode()).thenReturn(HttpStatus.valueOf(responseSuccess));
+        doReturn(mockedResponseEntity).when(restTemplate)
+            .exchange(any(URI.class), any(HttpMethod.class), any(HttpEntity.class), (Class<Object>) any());
+        dmaapProducerReactiveHttpClient.createDMaaPWebClient(restTemplate);
 
         //then
-        Assertions.assertEquals(response.block(), expectedResult.block());
+        StepVerifier.create(dmaapProducerReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel))
+            .expectSubscription().expectNext(mockedResponseEntity).verifyComplete();
     }
 
     @Test
@@ -100,8 +90,6 @@ class DMaaPProducerReactiveHttpClientTest {
         //given
         dmaapProducerReactiveHttpClient = spy(dmaapProducerReactiveHttpClient);
         //when
-        when(webClient.post()).thenReturn(requestBodyUriSpec);
-        dmaapProducerReactiveHttpClient.createDMaaPWebClient(webClient);
         when(dmaapProducerReactiveHttpClient.getUri()).thenThrow(URISyntaxException.class);
 
         //then
@@ -109,13 +97,9 @@ class DMaaPProducerReactiveHttpClientTest {
             .expectError(Exception.class).verify();
     }
 
-    private void mockWebClientDependantObject() {
-        RequestHeadersSpec requestHeadersSpec = mock(RequestHeadersSpec.class);
-        when(webClient.post()).thenReturn(requestBodyUriSpec);
-        when(requestBodyUriSpec.uri((URI) any())).thenReturn(requestBodyUriSpec);
-        when(requestBodyUriSpec.header(any(), any())).thenReturn(requestBodyUriSpec);
-        when(requestBodyUriSpec.body(any())).thenReturn(requestHeadersSpec);
-        doReturn(responseSpec).when(requestHeadersSpec).retrieve();
-        doReturn(responseSpec).when(responseSpec).onStatus(any(), any());
+    @Test
+    void getAppropriateUri_whenPassingCorrectedPathForPnf() throws URISyntaxException {
+        Assertions.assertEquals(dmaapProducerReactiveHttpClient.getUri(),
+            URI.create("https://54.45.33.2:1234/unauthenticated.PNF_READY"));
     }
 }
\ No newline at end of file