Added Redis Support
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-common / src / main / java / org / onap / dcae / apod / analytics / cdap / common / persistance / tca / TCAAlertsAbatementPersister.java
index 19cf9c7..57e2a59 100644 (file)
@@ -35,11 +35,16 @@ import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
 
+import java.io.IOException;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 
 import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
 
@@ -85,14 +90,21 @@ public abstract class TCAAlertsAbatementPersister {
                                final MetricsPerEventName violatedMetricsPerEventName,
                                final TCAVESResponse tcavesResponse,
                                final String abatementTS,
-                               final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable) {
+                               final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable,
+                               final Set<HostAndPort> redisHostAndPorts) {
         final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
 
         final long currentTimestamp = new Date().getTime();
         final String requestID = tcavesResponse.getRequestID();
         final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp,
                 requestID, abatementTS);
-        tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity);
+
+        // if redis is enabled save entity in redis cluster
+        if (redisHostAndPorts != null) {
+            persistAlertAbatementEntityInRedis(redisHostAndPorts, abatementTableKey, tcaAlertsAbatementEntity);
+        } else {
+            tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity);
+        }
 
         LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey);
 
@@ -101,12 +113,18 @@ public abstract class TCAAlertsAbatementPersister {
     public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener,
                                                        final MetricsPerEventName violatedMetricsPerEventName,
                                                        final ObjectMappedTable<TCAAlertsAbatementEntity>
-                                                               tcaAlertsAbatementTable) {
+                                                               tcaAlertsAbatementTable,
+                                                       final Set<HostAndPort> redisHostAndPorts) {
         final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
+
+        // if redis is enabled get entity from redis cluster
+        if (redisHostAndPorts != null) {
+            return getAlertAbatementEntityFromRedis(redisHostAndPorts, abatementTableKey);
+        }
+
         return tcaAlertsAbatementTable.read(abatementTableKey);
     }
 
-
     public static String createKey(final EventListener eventListener,
                                    final MetricsPerEventName violatedMetricsPerEventName) {
         // no null check required as all are required fields
@@ -124,4 +142,32 @@ public abstract class TCAAlertsAbatementPersister {
         return KEY_JOINER.join(abatementKeyList);
     }
 
+    private static TCAAlertsAbatementEntity getAlertAbatementEntityFromRedis(final Set<HostAndPort> redisHostAndPorts,
+                                                                            final String abatementTableKey) {
+        try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+            if (jedisCluster.exists(abatementTableKey)) {
+                return AnalyticsModelJsonUtils.readValue(jedisCluster.get(abatementTableKey),
+                        TCAAlertsAbatementEntity.class);
+            } else {
+                return null;
+            }
+        } catch (IOException e) {
+            final String errorMessage = String.format("Unable to look up key: %s in redis cluster: %s",
+                    abatementTableKey, redisHostAndPorts);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+    }
+
+    private static void persistAlertAbatementEntityInRedis(final Set<HostAndPort> redisHostAndPorts,
+                                                          final String abatementTableKey,
+                                                          final TCAAlertsAbatementEntity tcaAlertsAbatementEntity) {
+        try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
+            jedisCluster.set(abatementTableKey, AnalyticsModelJsonUtils.writeValueAsString(tcaAlertsAbatementEntity));
+        } catch (IOException e) {
+            final String errorMessage = String.format("Unable to store key:value - %s:%s in redis cluster: %s",
+                    abatementTableKey, tcaAlertsAbatementEntity, redisHostAndPorts);
+            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
+        }
+    }
+
 }