759c3d5332e1e8148f7124eb1470e39e412d866b
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / onap / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowlet.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.tca.flowlet;
22
23 import co.cask.cdap.api.annotation.Output;
24 import co.cask.cdap.api.annotation.ProcessInput;
25 import co.cask.cdap.api.annotation.Property;
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
27 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
28 import co.cask.cdap.api.flow.flowlet.FlowletContext;
29 import co.cask.cdap.api.flow.flowlet.OutputEmitter;
30 import org.apache.commons.lang3.StringUtils;
31 import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
32 import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
33 import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
34 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
35 import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
36 import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
37 import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
38 import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
39 import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
40 import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
41 import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.io.IOException;
46 import java.util.Date;
47
48 /**
49  * Flowlet responsible to sending out abatement alerts
50  *
51  * @author Rajiv Singla . Creation Date: 9/11/2017.
52  */
53 public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
54
55     private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
56
57     @Property
58     private final String tcaAlertsAbatementTableName;
59
60     @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
61     protected OutputEmitter<String> alertsAbatementOutputEmitter;
62
63     private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
64
65     public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
66         this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
67     }
68
69     @Override
70     public void configure() {
71         setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
72         setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
73     }
74
75     @Override
76     public void initialize(FlowletContext flowletContext) throws Exception {
77         super.initialize(flowletContext);
78         tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
79     }
80
81     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
82     public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {
83
84         final String cefMessage = thresholdCalculatorOutput.getCefMessage();
85         final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
86         final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
87
88         // alerts must have violated metrics per event name present
89         if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
90             final String errorMessage = String.format(
91                     "No violated metricsPerEventName found for VES Message: %s." +
92                             "Ignored alert message: %s", cefMessage, alertMessageString);
93             throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
94         }
95
96         final MetricsPerEventName violatedMetricsPerEventName =
97                 TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
98         final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
99         final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
100         final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
101         final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
102
103         switch (closedLoopEventStatus) {
104
105             case ONSET:
106
107                 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
108                 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
109                         null, tcaAlertsAbatementTable);
110                 LOG.debug("Emitting ONSET alert: {}", alertMessageString);
111                 alertsAbatementOutputEmitter.emit(alertMessageString);
112                 break;
113
114             case ABATED:
115
116                 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
117                 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
118                         TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
119                                 tcaAlertsAbatementTable);
120
121                 if (previousAlertsAbatementEntry != null) {
122
123                     LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
124
125                     final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
126                     if (abatementSentTS != null) {
127                         LOG.debug("Abatement alert was already sent at timestamp: {}. " +
128                                 "Skip resending this abatement alert again", abatementSentTS);
129                     } else {
130
131                         final long newAbatementSentTS = new Date().getTime();
132                         LOG.debug(
133                                 "No abatement alert was sent before." +
134                                         "Sending abatement alert:{} for the first time at:{}",
135                                 alertMessageString, newAbatementSentTS);
136
137                         // save new Abatement alert sent timestamp in table
138                         TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
139                                 Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
140
141                         // Set request id to be same as previous ONSET event request ID
142                         tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
143                         final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);
144
145                         LOG.info("Emitting ABATED alert: {}", abatedAlertString);
146                         alertsAbatementOutputEmitter.emit(abatedAlertString);
147
148                     }
149
150                 } else {
151                     LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
152                             alertMessageString);
153                 }
154
155                 break;
156
157             default:
158
159                 final String errorMessage = String.format(
160                         "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
161                                 "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
162                 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
163
164         }
165
166
167     }
168
169 }