57e2a593388bb118dcf92d96bdd632375b3ae7c1
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-common / src / main / java / org / onap / dcae / apod / analytics / cdap / common / persistance / tca / TCAAlertsAbatementPersister.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.common.persistance.tca;
22
23 import co.cask.cdap.api.data.schema.Schema;
24 import co.cask.cdap.api.data.schema.UnsupportedTypeException;
25 import co.cask.cdap.api.dataset.DatasetProperties;
26 import co.cask.cdap.api.dataset.lib.IndexedTable;
27 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTableProperties;
29 import com.google.common.base.Joiner;
30 import com.google.common.collect.ImmutableList;
31 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
32 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
33 import org.onap.dcae.apod.analytics.common.utils.PersistenceUtils;
34 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
35 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
36 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
37 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
38 import org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import redis.clients.jedis.HostAndPort;
42 import redis.clients.jedis.JedisCluster;
43
44 import java.io.IOException;
45 import java.util.Date;
46 import java.util.List;
47 import java.util.Set;
48
49 import static org.onap.dcae.apod.analytics.common.utils.PersistenceUtils.TABLE_ROW_KEY_COLUMN_NAME;
50
51 /**
52  * Utility methods to persist TCA Alerts Abatement information
53  *
54  * @author Rajiv Singla . Creation Date: 9/11/2017.
55  */
56 public abstract class TCAAlertsAbatementPersister {
57
58     private static final Logger LOG = LoggerFactory.getLogger(TCAAlertsAbatementPersister.class);
59
60     private static final Joiner KEY_JOINER = Joiner.on(PersistenceUtils.ROW_KEY_DELIMITER);
61
62     private TCAAlertsAbatementPersister() {
63         // private constructor
64     }
65
66     /**
67      * Creates {@link DatasetProperties} for Alerts Table
68      *
69      * @param timeToLiveSeconds alerts table Time to Live
70      *
71      * @return Alerts Abatement table properties
72      */
73     public static DatasetProperties getDatasetProperties(final int timeToLiveSeconds) {
74         try {
75             return ObjectMappedTableProperties.builder()
76                     .setType(TCAAlertsAbatementEntity.class)
77                     .setRowKeyExploreName(TABLE_ROW_KEY_COLUMN_NAME)
78                     .setRowKeyExploreType(Schema.Type.STRING)
79                     .add(IndexedTable.PROPERTY_TTL, timeToLiveSeconds)
80                     .setDescription(CDAPComponentsConstants.TCA_FIXED_ALERTS_ABATEMENT_DESCRIPTION_TABLE)
81                     .build();
82         } catch (UnsupportedTypeException e) {
83             final String errorMessage = "Unable to convert TCAAlertsAbatementEntity class to Schema";
84             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
85         }
86     }
87
88
89     public static void persist(final EventListener eventListener,
90                                final MetricsPerEventName violatedMetricsPerEventName,
91                                final TCAVESResponse tcavesResponse,
92                                final String abatementTS,
93                                final ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable,
94                                final Set<HostAndPort> redisHostAndPorts) {
95         final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
96
97         final long currentTimestamp = new Date().getTime();
98         final String requestID = tcavesResponse.getRequestID();
99         final TCAAlertsAbatementEntity tcaAlertsAbatementEntity = new TCAAlertsAbatementEntity(currentTimestamp,
100                 requestID, abatementTS);
101
102         // if redis is enabled save entity in redis cluster
103         if (redisHostAndPorts != null) {
104             persistAlertAbatementEntityInRedis(redisHostAndPorts, abatementTableKey, tcaAlertsAbatementEntity);
105         } else {
106             tcaAlertsAbatementTable.write(abatementTableKey, tcaAlertsAbatementEntity);
107         }
108
109         LOG.debug("Persisted AlertsAbatementEntity: {} with Key: {}", tcaAlertsAbatementEntity, abatementTableKey);
110
111     }
112
113     public static TCAAlertsAbatementEntity lookUpByKey(final EventListener eventListener,
114                                                        final MetricsPerEventName violatedMetricsPerEventName,
115                                                        final ObjectMappedTable<TCAAlertsAbatementEntity>
116                                                                tcaAlertsAbatementTable,
117                                                        final Set<HostAndPort> redisHostAndPorts) {
118         final String abatementTableKey = createKey(eventListener, violatedMetricsPerEventName);
119
120         // if redis is enabled get entity from redis cluster
121         if (redisHostAndPorts != null) {
122             return getAlertAbatementEntityFromRedis(redisHostAndPorts, abatementTableKey);
123         }
124
125         return tcaAlertsAbatementTable.read(abatementTableKey);
126     }
127
128     public static String createKey(final EventListener eventListener,
129                                    final MetricsPerEventName violatedMetricsPerEventName) {
130         // no null check required as all are required fields
131         final String eventName = violatedMetricsPerEventName.getEventName();
132         final String sourceName = eventListener.getEvent().getCommonEventHeader().getSourceName();
133         final String reportingEntityName = eventListener.getEvent().getCommonEventHeader().getReportingEntityName();
134         // violated threshold will always be present
135         final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
136         final String closedLoopControlName = violatedThreshold.getClosedLoopControlName();
137         final String fieldPath = violatedThreshold.getFieldPath();
138
139         final List<String> abatementKeyList =
140                 ImmutableList.of(eventName, sourceName, reportingEntityName, closedLoopControlName, fieldPath);
141
142         return KEY_JOINER.join(abatementKeyList);
143     }
144
145     private static TCAAlertsAbatementEntity getAlertAbatementEntityFromRedis(final Set<HostAndPort> redisHostAndPorts,
146                                                                             final String abatementTableKey) {
147         try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
148             if (jedisCluster.exists(abatementTableKey)) {
149                 return AnalyticsModelJsonUtils.readValue(jedisCluster.get(abatementTableKey),
150                         TCAAlertsAbatementEntity.class);
151             } else {
152                 return null;
153             }
154         } catch (IOException e) {
155             final String errorMessage = String.format("Unable to look up key: %s in redis cluster: %s",
156                     abatementTableKey, redisHostAndPorts);
157             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
158         }
159     }
160
161     private static void persistAlertAbatementEntityInRedis(final Set<HostAndPort> redisHostAndPorts,
162                                                           final String abatementTableKey,
163                                                           final TCAAlertsAbatementEntity tcaAlertsAbatementEntity) {
164         try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
165             jedisCluster.set(abatementTableKey, AnalyticsModelJsonUtils.writeValueAsString(tcaAlertsAbatementEntity));
166         } catch (IOException e) {
167             final String errorMessage = String.format("Unable to store key:value - %s:%s in redis cluster: %s",
168                     abatementTableKey, tcaAlertsAbatementEntity, redisHostAndPorts);
169             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
170         }
171     }
172
173 }