Parallel execution of Client Health check 23/109423/5
authorRam Krishna Verma <ram_krishna.verma@bell.ca>
Fri, 19 Jun 2020 21:49:21 +0000 (17:49 -0400)
committerputhuparambil.aditya <aditya.puthuparambil@bell.ca>
Mon, 29 Jun 2020 11:03:29 +0000 (12:03 +0100)
Issue-ID: POLICY-2390
Signed-off-by: puthuparambil.aditya <aditya.puthuparambil@bell.ca>
Change-Id: I42f443c64bcf6652adb9795ee8e71e37d8fa8c71

main/src/main/java/org/onap/policy/pap/main/rest/PolicyComponentsHealthCheckProvider.java
main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java
main/src/test/java/org/onap/policy/pap/main/rest/TestPolicyComponentsHealthCheckProvider.java

index 2da354d..09e148b 100644 (file)
 package org.onap.policy.pap.main.rest;
 
 import java.net.HttpURLConnection;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.tuple.Pair;
@@ -34,12 +42,12 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 import org.onap.policy.common.endpoints.http.client.HttpClient;
 import org.onap.policy.common.endpoints.http.client.HttpClientConfigException;
 import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
-import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.parameters.RestServerParameters;
 import org.onap.policy.common.endpoints.report.HealthCheckReport;
 import org.onap.policy.common.parameters.ParameterService;
 import org.onap.policy.common.utils.services.Registry;
 import org.onap.policy.models.base.PfModelException;
+import org.onap.policy.models.base.PfModelRuntimeException;
 import org.onap.policy.models.pdp.concepts.Pdp;
 import org.onap.policy.models.pdp.concepts.PdpGroup;
 import org.onap.policy.models.pdp.concepts.PdpSubGroup;
@@ -63,31 +71,25 @@ public class PolicyComponentsHealthCheckProvider {
     private static final String HEALTH_STATUS = "healthy";
     private static final Pattern IP_REPLACEMENT_PATTERN = Pattern.compile("//(\\S+):");
     private static final String POLICY_PAP_HEALTHCHECK_URI = "/policy/pap/v1/healthcheck";
-    private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME);
-    private List<HttpClient> clients = new ArrayList<>();
+    private static List<HttpClient> clients = new ArrayList<>();
+    private static ExecutorService clientHealthCheckExecutorService;
 
-    /**
-     * Constructs the object.
-     *
-     * @throws HttpClientConfigException if creating http client failed
-     */
-    public PolicyComponentsHealthCheckProvider() throws HttpClientConfigException {
-        this(HttpClientFactoryInstance.getClientFactory());
-    }
+    private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME);
 
     /**
-     * Constructs the object with provided http client factory.
-     *
-     * <p>This constructor is for unit test to use a mock {@link HttpClientFactory}.
-     *
-     * @param clientFactory factory used to construct http client
-     * @throws HttpClientConfigException if creating http client failed
+     * This method is used to initialize clients and executor.
+     * @param papParameterGroup
+     * @{link PapParameterGroup} contains the Pap Parameters set during startup
+     * @param clientFactory
+     * @{link HttpClientFactory} contains the client details
      */
-    PolicyComponentsHealthCheckProvider(HttpClientFactory clientFactory) throws HttpClientConfigException {
+    public static void initializeClientHealthCheckExecutorService(PapParameterGroup papParameterGroup,
+        HttpClientFactory clientFactory) throws HttpClientConfigException {
         for (BusTopicParams params : papParameterGroup.getHealthCheckRestClientParameters()) {
             params.setManaged(false);
             clients.add(clientFactory.build(params));
         }
+        clientHealthCheckExecutorService = Executors.newFixedThreadPool(clients.isEmpty() ? 1 : clients.size());
     }
 
     /**
@@ -100,19 +102,37 @@ public class PolicyComponentsHealthCheckProvider {
         Map<String, Object> result = new HashMap<>();
 
         // Check remote components
+        List<Callable<Entry<String, Object>>> tasks = new ArrayList<>();
+
         for (HttpClient client : clients) {
-            HealthCheckReport report = fetchPolicyComponentHealthStatus(client);
-            if (!report.isHealthy()) {
-                isHealthy = false;
-            }
-            result.put(client.getName(), report);
+            tasks.add(() -> new AbstractMap.SimpleEntry<>(client.getName(), fetchPolicyComponentHealthStatus(client)));
+        }
+
+        try {
+            List<Future<Entry<String, Object>>> futures = clientHealthCheckExecutorService.invokeAll(tasks);
+            result = futures.stream().map(entryFuture -> {
+                try {
+                    return entryFuture.get();
+                } catch (ExecutionException e) {
+                    throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check Failed ", e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", e);
+                }
+            }).collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+            //true when all the clients health status is true
+            isHealthy = result.values().stream().allMatch(o -> ((HealthCheckReport) o).isHealthy());
+        } catch (InterruptedException exp) {
+            Thread.currentThread().interrupt();
+            throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", exp);
         }
 
         // Check PAP itself
         HealthCheckReport papReport = new HealthCheckProvider().performHealthCheck();
         RestServerParameters restServerParameters = papParameterGroup.getRestServerParameters();
-        papReport.setUrl((restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":"
-            + restServerParameters.getPort() + POLICY_PAP_HEALTHCHECK_URI);
+        papReport.setUrl(
+            (restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":" + restServerParameters
+                .getPort() + POLICY_PAP_HEALTHCHECK_URI);
         if (!papReport.isHealthy()) {
             isHealthy = false;
         }
@@ -133,13 +153,13 @@ public class PolicyComponentsHealthCheckProvider {
 
         result.put(HEALTH_STATUS, isHealthy);
         LOGGER.debug("Policy Components HealthCheck Response - {}", result);
-        return Pair.of(Response.Status.OK, result);
+        return Pair.of(Status.OK, result);
     }
 
     private Map<String, List<Pdp>> fetchPdpsHealthStatus() throws PfModelException {
         Map<String, List<Pdp>> pdpListWithType = new HashMap<>();
-        final PolicyModelsProviderFactoryWrapper modelProviderWrapper =
-            Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
+        final PolicyModelsProviderFactoryWrapper modelProviderWrapper = Registry
+            .get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class);
         try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) {
             final List<PdpGroup> groups = databaseProvider.getPdpGroups(null);
             for (final PdpGroup group : groups) {
@@ -156,8 +176,7 @@ public class PolicyComponentsHealthCheckProvider {
         HealthCheckReport clientReport;
         try {
             Response resp = httpClient.get();
-            clientReport = replaceIpWithHostname(
-                resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl());
+            clientReport = replaceIpWithHostname(resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl());
 
             // A health report is read successfully when HTTP status is not OK, it is also not healthy
             // even in the report it says healthy.
@@ -191,4 +210,12 @@ public class PolicyComponentsHealthCheckProvider {
         }
         return report;
     }
-}
+
+    /**
+     * This method clears clients {@link List} and clientHealthCheckExecutorService {@link ExecutorService}.
+     */
+    public static void cleanup() {
+        clients.clear();
+        clientHealthCheckExecutorService.shutdown();
+    }
+}
\ No newline at end of file
index fd13c23..4272e49 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicReference;
 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
 import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
 import org.onap.policy.common.endpoints.http.server.RestServer;
 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
 import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher;
@@ -57,6 +58,7 @@ import org.onap.policy.pap.main.rest.PdpGroupHealthCheckControllerV1;
 import org.onap.policy.pap.main.rest.PdpGroupQueryControllerV1;
 import org.onap.policy.pap.main.rest.PdpGroupStateChangeControllerV1;
 import org.onap.policy.pap.main.rest.PolicyComponentsHealthCheckControllerV1;
+import org.onap.policy.pap.main.rest.PolicyComponentsHealthCheckProvider;
 import org.onap.policy.pap.main.rest.PolicyStatusControllerV1;
 import org.onap.policy.pap.main.rest.PolicyUndeployerImpl;
 import org.onap.policy.pap.main.rest.StatisticsRestControllerV1;
@@ -236,6 +238,12 @@ public class PapActivator extends ServiceManagerContainer {
                                     .build()),
             () -> Registry.unregister(PapConstants.REG_PDP_TRACKER));
 
+        addAction("PAP client executor",
+            () ->
+                PolicyComponentsHealthCheckProvider.initializeClientHealthCheckExecutorService(papParameterGroup,
+                                HttpClientFactoryInstance.getClientFactory()),
+                PolicyComponentsHealthCheckProvider::cleanup);
+
         addAction("REST server",
             () -> {
                 RestServer server = new RestServer(papParameterGroup.getRestServerParameters(), PapAafFilter.class,
index 9d2fa63..bdf4700 100644 (file)
@@ -65,6 +65,7 @@ public class TestPolicyComponentsHealthCheckProvider {
     private static final String CLIENT_1 = "client1";
     private static final String PDP_GROUP_DATA_FILE = "rest/pdpGroup.json";
     private static final String PAP_GROUP_PARAMS_NAME = "PapGroup";
+    private static final String HEALTHY = "healthy";
 
     @Mock
     private PolicyModelsProvider dao;
@@ -135,6 +136,9 @@ public class TestPolicyComponentsHealthCheckProvider {
         List<BusTopicParams> params = papParameterGroup.getHealthCheckRestClientParameters();
         when(clientFactory.build(params.get(0))).thenReturn(client1);
         when(clientFactory.build(params.get(1))).thenReturn(client2);
+
+        PolicyComponentsHealthCheckProvider.initializeClientHealthCheckExecutorService(papParameterGroup,
+            clientFactory);
     }
 
     /**
@@ -147,56 +151,58 @@ public class TestPolicyComponentsHealthCheckProvider {
         } else {
             ParameterService.deregister(PAP_GROUP_PARAMS_NAME);
         }
+        PolicyComponentsHealthCheckProvider.cleanup();
     }
 
+
     @Test
-    public void testFetchPolicyComponentsHealthStatus_allHealthy() throws Exception {
-        PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(clientFactory);
+    public void testFetchPolicyComponentsHealthStatus_allHealthy() {
+        PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider();
         Pair<Status, Map<String, Object>> ret = provider.fetchPolicyComponentsHealthStatus();
         assertEquals(Response.Status.OK, ret.getLeft());
-        assertTrue((Boolean) ret.getRight().get("healthy"));
+        assertTrue((Boolean) ret.getRight().get(HEALTHY));
     }
 
     @Test
-    public void testFetchPolicyComponentsHealthStatus_unhealthyClient() throws Exception {
+    public void testFetchPolicyComponentsHealthStatus_unhealthyClient() {
         when(response1.getStatus()).thenReturn(HttpURLConnection.HTTP_INTERNAL_ERROR);
         when(response1.readEntity(HealthCheckReport.class))
-                .thenReturn(createReport(HttpURLConnection.HTTP_INTERNAL_ERROR, false));
+            .thenReturn(createReport(HttpURLConnection.HTTP_INTERNAL_ERROR, false));
         Map<String, Object> result = callFetchPolicyComponentsHealthStatus();
-        assertFalse((Boolean) result.get("healthy"));
+        assertFalse((Boolean) result.get(HEALTHY));
         HealthCheckReport report = (HealthCheckReport) result.get(CLIENT_1);
         assertFalse(report.isHealthy());
 
         when(response1.getStatus()).thenReturn(HttpURLConnection.HTTP_OK);
         when(response1.readEntity(HealthCheckReport.class)).thenReturn(createReport(HttpURLConnection.HTTP_OK, false));
         Map<String, Object> result2 = callFetchPolicyComponentsHealthStatus();
-        assertFalse((Boolean) result2.get("healthy"));
+        assertFalse((Boolean) result2.get(HEALTHY));
         HealthCheckReport report2 = (HealthCheckReport) result.get(CLIENT_1);
         assertFalse(report2.isHealthy());
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testFetchPolicyComponentsHealthStatus_unhealthyPdps() throws Exception {
+    public void testFetchPolicyComponentsHealthStatus_unhealthyPdps() {
         // Get a PDP and set it unhealthy
         groups.get(0).getPdpSubgroups().get(0).getPdpInstances().get(0).setHealthy(PdpHealthStatus.NOT_HEALTHY);
         Map<String, Object> result = callFetchPolicyComponentsHealthStatus();
         Map<String, List<Pdp>> pdpListWithType = (Map<String, List<Pdp>>) result.get(PapConstants.POLICY_PDPS);
         assertEquals(2, pdpListWithType.size());
-        assertFalse((Boolean) result.get("healthy"));
+        assertFalse((Boolean) result.get(HEALTHY));
     }
 
     @Test
-    public void testFetchPolicyComponentsHealthStatus_unhealthyPap() throws Exception {
+    public void testFetchPolicyComponentsHealthStatus_unhealthyPap() {
         when(papActivator.isAlive()).thenReturn(false);
         Map<String, Object> result = callFetchPolicyComponentsHealthStatus();
-        assertFalse((Boolean) result.get("healthy"));
+        assertFalse((Boolean) result.get(HEALTHY));
         HealthCheckReport report = (HealthCheckReport) result.get(PapConstants.POLICY_PAP);
         assertFalse(report.isHealthy());
     }
 
-    private Map<String, Object> callFetchPolicyComponentsHealthStatus() throws Exception {
-        PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(clientFactory);
+    private Map<String, Object> callFetchPolicyComponentsHealthStatus() {
+        PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider();
         return provider.fetchPolicyComponentsHealthStatus().getRight();
     }