543fc9e97439eb72058b9d5a7d63119904fdebb7
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowlet.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
22
23 import co.cask.cdap.api.annotation.Output;
24 import co.cask.cdap.api.annotation.ProcessInput;
25 import co.cask.cdap.api.annotation.Property;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
28 import co.cask.cdap.api.flow.flowlet.FlowletContext;
29 import co.cask.cdap.api.flow.flowlet.OutputEmitter;
30 import org.apache.commons.lang3.StringUtils;
31 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
32 import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
33 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
34 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
36 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ControlLoopEventStatus;
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
39 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
40 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
41 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.util.Date;
46
47 /**
48  * Flowlet responsible to sending out abatement alerts
49  *
50  * @author rs153v (Rajiv Singla) . Creation Date: 9/11/2017.
51  */
52 public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
53
54     private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
55
56     @Property
57     private final String tcaAlertsAbatementTableName;
58
59     @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
60     protected OutputEmitter<String> alertsAbatementOutputEmitter;
61
62     private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
63
64     public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
65         this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
66     }
67
68     @Override
69     public void configure() {
70         setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
71         setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
72     }
73
74     @Override
75     public void initialize(FlowletContext flowletContext) throws Exception {
76         super.initialize(flowletContext);
77         tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
78     }
79
80     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
81     public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws Exception {
82
83         final String cefMessage = thresholdCalculatorOutput.getCefMessage();
84         final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
85         final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
86
87         // alerts must have violated metrics per event name present
88         if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
89             final String errorMessage = String.format(
90                     "No violated metricsPerEventName found for VES Message: %s." +
91                             "Ignored alert message: %s", cefMessage, alertMessageString);
92             throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
93         }
94
95         final MetricsPerEventName violatedMetricsPerEventName =
96                 TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
97         final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
98         final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
99         final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
100         final ControlLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
101
102         switch (closedLoopEventStatus) {
103
104             case ONSET:
105
106                 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
107                 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
108                         null, tcaAlertsAbatementTable);
109                 LOG.info("Emitting ONSET alert: {}", alertMessageString);
110                 alertsAbatementOutputEmitter.emit(alertMessageString);
111                 break;
112
113             case ABATED:
114
115                 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
116                 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
117                         TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
118                                 tcaAlertsAbatementTable);
119
120                 if (previousAlertsAbatementEntry != null) {
121
122                     LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
123
124                     final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
125                     if (abatementSentTS != null) {
126                         LOG.debug("Abatement alert was already sent at timestamp: {}. " +
127                                 "Skip resending this abatement alert again", abatementSentTS);
128                     } else {
129
130                         final long newAbatementSentTS = new Date().getTime();
131                         LOG.debug(
132                                 "No abatement alert was sent before." +
133                                         "Sending abatement alert:{} for the first time at:{}",
134                                 alertMessageString, newAbatementSentTS);
135
136                         // save new Abatement alert sent timestamp in table
137                         TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
138                                 Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
139                         LOG.info("Emitting ABATED alert: {}", alertMessageString);
140                         alertsAbatementOutputEmitter.emit(alertMessageString);
141
142                     }
143
144                 } else {
145                     LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
146                             alertMessageString);
147                 }
148
149                 break;
150
151             default:
152
153                 final String errorMessage = String.format(
154                         "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
155                                 "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
156                 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
157
158         }
159
160
161     }
162
163 }