f240556f2fb3bc7d432962c9e75f9b556d8b7e4e
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / onap / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowlet.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.tca.flowlet;
22
23 import co.cask.cdap.api.annotation.Output;
24 import co.cask.cdap.api.annotation.ProcessInput;
25 import co.cask.cdap.api.annotation.Property;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
28 import co.cask.cdap.api.flow.flowlet.FlowletContext;
29 import co.cask.cdap.api.flow.flowlet.OutputEmitter;
30 import org.apache.commons.lang3.StringUtils;
31 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
32 import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
33 import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
34 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
35 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
36 import org.onap.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
37 import org.onap.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
38 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
39 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
40 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
41 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
42 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
43 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
44 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import redis.clients.jedis.HostAndPort;
48 import redis.clients.jedis.JedisCluster;
49 import redis.clients.jedis.JedisPool;
50 import redis.clients.jedis.exceptions.JedisConnectionException;
51
52 import java.io.IOException;
53 import java.util.Date;
54 import java.util.LinkedHashSet;
55 import java.util.Map;
56 import java.util.Set;
57
58 /**
59  * Flowlet responsible to sending out abatement alerts
60  *
61  * @author Rajiv Singla . Creation Date: 9/11/2017.
62  */
63 public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
64
65     private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
66
67     @Property
68     private final String tcaAlertsAbatementTableName;
69
70     private Set<HostAndPort> redisHostAndPorts = null;
71
72     @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
73     protected OutputEmitter<String> alertsAbatementOutputEmitter;
74
75     private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
76
77     public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
78         this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
79     }
80
81     @Override
82     public void configure() {
83         setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
84         setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
85     }
86
87     @Override
88     public void initialize(FlowletContext flowletContext) throws Exception {
89         super.initialize(flowletContext);
90         tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
91         // Parse runtime arguments
92         final TCAAppPreferences tcaAppPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(flowletContext);
93         if(tcaAppPreferences.getEnableRedisCaching()) {
94             final String redisHosts = tcaAppPreferences.getRedisHosts();
95             LOG.info("Redis Distributed Caching is enabled for abated alerts with Redis Hosts: {}", redisHosts);
96             redisHostAndPorts = getRedisHostsAndPorts(redisHosts);
97             checkRedisConnection(redisHostAndPorts);
98         } else {
99             LOG.info("Redis Distributed caching is disabled for abated alerts");
100         }
101     }
102
103     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
104     public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {
105
106         final String cefMessage = thresholdCalculatorOutput.getCefMessage();
107         final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
108         final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
109
110         // alerts must have violated metrics per event name present
111         if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
112             final String errorMessage = String.format(
113                     "No violated metricsPerEventName found for VES Message: %s." +
114                             "Ignored alert message: %s", cefMessage, alertMessageString);
115             throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
116         }
117
118         final MetricsPerEventName violatedMetricsPerEventName =
119                 TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
120         final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
121         final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
122         final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
123         final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
124
125         switch (closedLoopEventStatus) {
126
127             case ONSET:
128
129                 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
130                 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
131                         null, tcaAlertsAbatementTable, redisHostAndPorts);
132                 LOG.debug("Emitting ONSET alert: {}", alertMessageString);
133                 alertsAbatementOutputEmitter.emit(alertMessageString);
134                 break;
135
136             case ABATED:
137
138                 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
139                 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
140                         TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
141                                 tcaAlertsAbatementTable, redisHostAndPorts);
142
143                 if (previousAlertsAbatementEntry != null) {
144
145                     LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
146
147                     final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
148                     if (abatementSentTS != null) {
149                         LOG.debug("Abatement alert was already sent at timestamp: {}. " +
150                                 "Skip resending this abatement alert again", abatementSentTS);
151                     } else {
152
153                         final long newAbatementSentTS = new Date().getTime();
154                         LOG.debug(
155                                 "No abatement alert was sent before." +
156                                         "Sending abatement alert:{} for the first time at:{}",
157                                 alertMessageString, newAbatementSentTS);
158
159                         // save new Abatement alert sent timestamp in table
160                         TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
161                                 Long.toString(newAbatementSentTS), tcaAlertsAbatementTable, redisHostAndPorts);
162
163                         // Set request id to be same as previous ONSET event request ID
164                         tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
165                         final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);
166
167                         LOG.info("Emitting ABATED alert: {}", abatedAlertString);
168                         alertsAbatementOutputEmitter.emit(abatedAlertString);
169
170                     }
171
172                 } else {
173                     LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
174                             alertMessageString);
175                 }
176
177                 break;
178
179             default:
180
181                 final String errorMessage = String.format(
182                         "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
183                                 "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
184                 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
185
186         }
187
188
189     }
190
191     private static Set<HostAndPort> getRedisHostsAndPorts(final String redisHosts) {
192         final LinkedHashSet<HostAndPort> hostAndPorts = new LinkedHashSet<>();
193         final String[] redisHostsString = redisHosts.split(",");
194         for (String redisHostString : redisHostsString) {
195             hostAndPorts.add(HostAndPort.parseString(redisHostString.trim()));
196         }
197         return hostAndPorts;
198     }
199
200     private static void checkRedisConnection(final Set<HostAndPort> redisHostAndPorts) {
201         LOG.info("Checking Redis Connection for Redis Hosts: {}", redisHostAndPorts);
202         try (final JedisCluster jedisCluster = new JedisCluster(redisHostAndPorts)) {
203             final Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
204             jedisCluster.get("testKey");
205             LOG.info("Confirmed redis cluster Nodes: {}", clusterNodes.keySet());
206         } catch (JedisConnectionException | IOException e) {
207             LOG.error("Unable to make Redis connection for given redisHosts: {}", redisHostAndPorts);
208             throw new DCAEAnalyticsRuntimeException("No Redis Connection", LOG, e);
209         }
210     }
211
212 }