Fix runtime 54/83354/1
authorMarcin Migdal <marcin.migdal@nokia.com>
Tue, 26 Mar 2019 13:53:00 +0000 (14:53 +0100)
committerMarcin Migdal <marcin.migdal@nokia.com>
Tue, 26 Mar 2019 13:53:00 +0000 (14:53 +0100)
Change-Id: Ib2c3971fcd8743e6a681ac2306d8da900879f177
Issue-ID: DCAEGEN2-1361
Signed-off-by: Marcin Migdal <marcin.migdal@nokia.com>
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java

index ec8ffaf..e2a91f7 100644 (file)
@@ -26,13 +26,22 @@ import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
 import reactor.netty.http.client.HttpClientResponse;
 import reactor.core.publisher.Mono;
+import org.apache.http.HttpResponse;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
  */
 interface DmaapPublisherTask {
 
+    /**
+     *
+     * Does not work reactive version with DMaaP MR  - to be investigated why in future
+     * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+     * */
+    @Deprecated
     Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException;
 
+    Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+
     DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;;
 }
index c25528b..1a9abf0 100644 (file)
@@ -22,22 +22,24 @@ package org.onap.dcaegen2.services.prh.tasks;
 
 import java.util.Optional;
 import javax.net.ssl.SSLException;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.onap.dcaegen2.services.prh.configuration.Config;
 import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
 import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
 import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-
+import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import reactor.netty.http.client.HttpClientResponse;
 import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClientResponse;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -46,13 +48,14 @@ import reactor.core.publisher.Mono;
 public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
+    private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl();
     private Config config;
-
     private final PublisherReactiveHttpClientFactory httpClientFactory;
 
     @Autowired
     public DmaapPublisherTaskImpl(Config config) {
-        this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new PnfReadyJsonBodyBuilderImpl()));
+        this(config,
+            new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), new PnfReadyJsonBodyBuilderImpl()));
     }
 
     DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
@@ -61,18 +64,55 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
     }
 
     @Override
-    public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException,SSLException {
+    public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel)
+        throws DmaapNotFoundException, SSLException {
         if (consumerDmaapModel == null) {
             throw new DmaapNotFoundException("Invoked null object to DMaaP task");
         }
         DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
         LOGGER.info("Method called with arg {}", consumerDmaapModel);
-        return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel,Optional.empty());
+        return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty());
+    }
+
+
+    @Override
+    public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException {
+        return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+
     }
 
+    /**
+     *
+     * Does not work reactive version with DMaaP MR  - to be investigated why in future
+     * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel);
+     * */
     @Override
-    public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException{
-            return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+    public Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) {
+        String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel);
+        DefaultHttpClient httpClient = new DefaultHttpClient();
+        HttpPost postRequest = new HttpPost(getUrl());
+        try {
+            StringEntity input = new StringEntity(json);
+            input.setContentType(config.getDmaapPublisherConfiguration().dmaapContentType());
+            postRequest.setEntity(input);
+            HttpResponse response = httpClient.execute(postRequest);
+            return Mono.just(response);
+        } catch (Exception e) {
+            LOGGER.warn("Publishing to DMaaP MR failed: {}", e);
+            return Mono.error(e);
+        }
+    }
 
+    private String getUrl() {
+        return (new URIBuilder()).scheme(config.getDmaapPublisherConfiguration().dmaapProtocol())
+            .host(config.getDmaapPublisherConfiguration().dmaapHostName())
+            .port(config.getDmaapPublisherConfiguration().dmaapPortNumber()).path(this.createRequestPath()).build()
+            .toString();
     }
+
+    private String createRequestPath() {
+        return "/" + config.getDmaapPublisherConfiguration().dmaapTopicName();
+    }
+
+
 }
\ No newline at end of file
index 16a6f8c..a7bf42d 100644 (file)
@@ -28,6 +28,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
 import javax.net.ssl.SSLException;
+
 import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
 import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
 import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
@@ -42,6 +43,7 @@ import org.springframework.stereotype.Component;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClientResponse;
+import org.apache.http.HttpResponse;
 
 /**
  * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
@@ -97,7 +99,7 @@ public class ScheduledTasks {
                 .flatMap(this::processAdditionalFields)
                 .doOnError(exception ->
                     logger.warn("BBSActionsTask exception has been registered: ", exception))
-                .flatMap(this::publishToDmaapConfiguration)
+                .flatMap(this::publishToDmaapConfigurationWithApache)
                 .doOnError(exception ->
                     logger.warn("DMaaPProducerTask exception has been registered: ", exception))
                 .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
@@ -115,6 +117,10 @@ public class ScheduledTasks {
         logger.info("PRH tasks have been completed");
     }
 
+    /**
+     * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
+     * */
+    @Deprecated
     private void onSuccess(HttpClientResponse response) {
         String statusCode = Integer.toString(response.status().code());
         MDC.put(RESPONSE_CODE, statusCode);
@@ -123,6 +129,16 @@ public class ScheduledTasks {
         MDC.remove(RESPONSE_CODE);
     }
 
+    private void onSuccess(HttpResponse response) {
+        String statusCode = Integer.toString(response.getStatusLine().getStatusCode());
+        MDC.put(RESPONSE_CODE, statusCode);
+        logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
+            statusCode);
+        MDC.remove(RESPONSE_CODE);
+    }
+
+
+
     private void onError(Throwable throwable) {
         if (!(throwable instanceof DmaapEmptyResponseException)) {
             logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
@@ -159,6 +175,10 @@ public class ScheduledTasks {
         return bbsActionsTask.execute(consumerDmaapModel);
     }
 
+    /**
+     * Marked as deprecated due to problems with DMaaP MR, to be fixed in future
+     * */
+    @Deprecated
     private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
         try {
             return dmaapProducerTask.execute(monoAaiModel);
@@ -167,6 +187,16 @@ public class ScheduledTasks {
         }
     }
 
+    private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) {
+        try {
+            return dmaapProducerTask.executeWithApache(monoAaiModel);
+        } catch (Exception e) {
+            return Mono.error(e);
+        }
+    }
+
+
+
     private Predicate<Throwable> resumePrhPredicate() {
         return exception -> exception instanceof PrhTaskException;
     }