Parallel execution of Client Health check
[policy/pap.git] / main / src / main / java / org / onap / policy / pap / main / rest / PolicyComponentsHealthCheckProvider.java
index 2da354d..09e148b 100644 (file)
 package org.onap.policy.pap.main.rest;
 
 import java.net.HttpURLConnection;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.tuple.Pair;
@@ -34,12 +42,12 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
-import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.parameters.RestServerParameters;
 import org.onap.policy.common.endpoints.report.HealthCheckReport;
 import org.onap.policy.common.parameters.ParameterService;
 import org.onap.policy.common.utils.services.Registry;
 import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.pdp.concepts.Pdp;
 import org.onap.policy.models.pdp.concepts.PdpGroup;
 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
@@ -63,31 +71,25 @@ public class PolicyComponentsHealthCheckProvider {
     private static final String HEALTH_STATUS = "healthy";
     private static final Pattern IP_REPLACEMENT_PATTERN = Pattern.compile("//(\\S+):");
     private static final String POLICY_PAP_HEALTHCHECK_URI = "/policy/pap/v1/healthcheck";
-    private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME);
-    private List<HttpClient> clients = new ArrayList<>();
+    private static List<HttpClient> clients = new ArrayList<>();
+    private static ExecutorService clientHealthCheckExecutorService;
 
-    /**
-     * Constructs the object.
-     *
-     * @throws HttpClientConfigException if creating http client failed
-     */
-    public PolicyComponentsHealthCheckProvider() throws HttpClientConfigException {
-        this(HttpClientFactoryInstance.getClientFactory());
-    }
+    private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME);
 
     /**
-     * Constructs the object with provided http client factory.
-     *
-     * <p>This constructor is for unit test to use a mock {@link HttpClientFactory}.
-     *
-     * @param clientFactory factory used to construct http client
-     * @throws HttpClientConfigException if creating http client failed
+     * This method is used to initialize clients and executor.
+     * @param papParameterGroup
+     * @{link PapParameterGroup} contains the Pap Parameters set during startup
+     * @param clientFactory
+     * @{link HttpClientFactory} contains the client details
      */
-    PolicyComponentsHealthCheckProvider(HttpClientFactory clientFactory) throws HttpClientConfigException {
+    public static void initializeClientHealthCheckExecutorService(PapParameterGroup papParameterGroup,
+        HttpClientFactory clientFactory) throws HttpClientConfigException {
         for (BusTopicParams params : papParameterGroup.getHealthCheckRestClientParameters()) {
             params.setManaged(false);
             clients.add(clientFactory.build(params));
         }
+        clientHealthCheckExecutorService = Executors.newFixedThreadPool(clients.isEmpty() ? 1 : clients.size());
     }
 
     /**
@@ -100,19 +102,37 @@ public class PolicyComponentsHealthCheckProvider {
         Map<String, Object> result = new HashMap<>();
 
         // Check remote components
+        List<Callable<Entry<String, Object>>> tasks = new ArrayList<>();
+
         for (HttpClient client : clients) {
-            HealthCheckReport report = fetchPolicyComponentHealthStatus(client);
-            if (!report.isHealthy()) {
-                isHealthy = false;
-            }
-            result.put(client.getName(), report);
+            tasks.add(() -> new AbstractMap.SimpleEntry<>(client.getName(), fetchPolicyComponentHealthStatus(client)));
+        }
+
+        try {
+            List<Future<Entry<String, Object>>> futures = clientHealthCheckExecutorService.invokeAll(tasks);
+            result = futures.stream().map(entryFuture -> {
+                try {
+                    return entryFuture.get();
+                } catch (ExecutionException e) {
+                    throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check Failed ", e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", e);
+                }
+            }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+            //true when all the clients health status is true
+            isHealthy = result.values().stream().allMatch(o -> ((HealthCheckReport) o).isHealthy());
+        } catch (InterruptedException exp) {
+            Thread.currentThread().interrupt();
+            throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", exp);
         }
 
         // Check PAP itself
         HealthCheckReport papReport = new HealthCheckProvider().performHealthCheck();
         RestServerParameters restServerParameters = papParameterGroup.getRestServerParameters();
-        papReport.setUrl((restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":"
-            + restServerParameters.getPort() + POLICY_PAP_HEALTHCHECK_URI);
+        papReport.setUrl(
+            (restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":" + restServerParameters
+                .getPort() + POLICY_PAP_HEALTHCHECK_URI);
         if (!papReport.isHealthy()) {
             isHealthy = false;
         }
@@ -133,13 +153,13 @@ public class PolicyComponentsHealthCheckProvider {
 
         result.put(HEALTH_STATUS, isHealthy);
         LOGGER.debug("Policy Components HealthCheck Response - {}", result);
-        return Pair.of(Response.Status.OK, result);
+        return Pair.of(Status.OK, result);
     }
 
     private Map<String, List<Pdp>> fetchPdpsHealthStatus() throws PfModelException {
         Map<String, List<Pdp>> pdpListWithType = new HashMap<>();
-        final PolicyModelsProviderFactoryWrapper modelProviderWrapper =
-            Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
+        final PolicyModelsProviderFactoryWrapper modelProviderWrapper = Registry
+            .get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
         try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
             final List<PdpGroup> groups = databaseProvider.getPdpGroups(null);
             for (final PdpGroup group : groups) {
@@ -156,8 +176,7 @@ public class PolicyComponentsHealthCheckProvider {
         HealthCheckReport clientReport;
         try {
             Response resp = httpClient.get();
-            clientReport = replaceIpWithHostname(
-                resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl());
+            clientReport = replaceIpWithHostname(resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl());
 
             // A health report is read successfully when HTTP status is not OK, it is also not healthy
             // even in the report it says healthy.
@@ -191,4 +210,12 @@ public class PolicyComponentsHealthCheckProvider {
         }
         return report;
     }
-}
+
+    /**
+     * This method clears clients {@link List} and clientHealthCheckExecutorService {@link ExecutorService}.
+     */
+    public static void cleanup() {
+        clients.clear();
+        clientHealthCheckExecutorService.shutdown();
+    }
+}
\ No newline at end of file