Added Redis Support
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / onap / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowlet.java
index 759c3d5..f240556 100644 (file)
@@ -33,6 +33,9 @@ import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOu
 import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
+import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
@@ -41,9 +44,16 @@ import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.exceptions.JedisConnectionException;
 
 import java.io.IOException;
 import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Flowlet responsible to sending out abatement alerts
@@ -57,6 +67,8 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
     @Property
     private final String tcaAlertsAbatementTableName;
 
+    private Set<HostAndPort> redisHostAndPorts = null;
+
     @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
     protected OutputEmitter<String> alertsAbatementOutputEmitter;
 
@@ -76,6 +88,16 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
     public void initialize(FlowletContext flowletContext) throws Exception {
         super.initialize(flowletContext);
         tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+        // Parse runtime arguments
+        final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
+        if(tcaAppPreferences.getEnableRedisCaching()) {
+            final String redisHosts = tcaAppPreferences.getRedisHosts();
+            LOG.info("Redis Distributed Caching is enabled for abated alerts with Redis Hosts: {}", redisHosts);
+            redisHostAndPorts = getRedisHostsAndPorts(redisHosts);
+            checkRedisConnection(redisHostAndPorts);
+        } else {
+            LOG.info("Redis Distributed caching is disabled for abated alerts");
+        }
     }
 
     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
@@ -106,7 +128,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
 
                 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
                 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
-                        null, tcaAlertsAbatementTable);
+                        null, tcaAlertsAbatementTable, redisHostAndPorts);
                 LOG.debug("Emitting ONSET alert: {}", alertMessageString);
                 alertsAbatementOutputEmitter.emit(alertMessageString);
                 break;
@@ -116,7 +138,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
                 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
                 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
                         TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
-                                tcaAlertsAbatementTable);
+                                tcaAlertsAbatementTable, redisHostAndPorts);
 
                 if (previousAlertsAbatementEntry != null) {
 
@@ -136,7 +158,7 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
 
                         // save new Abatement alert sent timestamp in table
                         TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
-                                Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+                                Long.toString(newAbatementSentTS), tcaAlertsAbatementTable, redisHostAndPorts);
 
                         // Set request id to be same as previous ONSET event request ID
                         tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
@@ -166,4 +188,25 @@ public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
 
     }
 
+    private static Set<HostAndPort> getRedisHostsAndPorts(final String redisHosts) {
+        final LinkedHashSet<HostAndPort> hostAndPorts = new LinkedHashSet<>();
+        final String[] redisHostsString = redisHosts.split(",");
+        for (String redisHostString : redisHostsString) {
+            hostAndPorts.add(HostAndPort.parseString(redisHostString.trim()));
+        }
+        return hostAndPorts;
+    }
+
+    private static void checkRedisConnection(final Set<HostAndPort> redisHostAndPorts) {
+        LOG.info("Checking Redis Connection for Redis Hosts: {}", redisHostAndPorts);
+        try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+            final Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
+            jedisCluster.get("testKey");
+            LOG.info("Confirmed redis cluster Nodes: {}", clusterNodes.keySet());
+        } catch (JedisConnectionException | IOException e) {
+            LOG.error("Unable to make Redis connection for given redisHosts: {}", redisHostAndPorts);
+            throw new DCAEAnalyticsRuntimeException("No Redis Connection", LOG, e);
+        }
+    }
+
 }