TCA: Support for VES/A&AI enrichment
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / flowlet / TCAVESAlertsAbatementFlowlet.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;\r
22 \r
23 import co.cask.cdap.api.annotation.Output;\r
24 import co.cask.cdap.api.annotation.ProcessInput;\r
25 import co.cask.cdap.api.annotation.Property;\r
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;\r
27 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;\r
28 import co.cask.cdap.api.flow.flowlet.FlowletContext;\r
29 import co.cask.cdap.api.flow.flowlet.OutputEmitter;\r
30 import org.apache.commons.lang3.StringUtils;\r
31 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;\r
32 import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;\r
33 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
34 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;\r
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;\r
36 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
39 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
40 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;\r
41 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
42 import org.slf4j.Logger;\r
43 import org.slf4j.LoggerFactory;\r
44 \r
45 import java.io.IOException;\r
46 import java.util.Date;\r
47 \r
48 /**\r
49  * Flowlet responsible to sending out abatement alerts\r
50  *\r
51  * @author Rajiv Singla . Creation Date: 9/11/2017.\r
52  */\r
53 public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {\r
54 \r
55     private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);\r
56 \r
57     @Property\r
58     private final String tcaAlertsAbatementTableName;\r
59 \r
60     @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)\r
61     protected OutputEmitter<String> alertsAbatementOutputEmitter;\r
62 \r
63     private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;\r
64 \r
65     public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {\r
66         this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;\r
67     }\r
68 \r
69     @Override\r
70     public void configure() {\r
71         setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);\r
72         setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);\r
73     }\r
74 \r
75     @Override\r
76     public void initialize(FlowletContext flowletContext) throws Exception {\r
77         super.initialize(flowletContext);\r
78         tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);\r
79     }\r
80 \r
81     @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)\r
82     public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {\r
83 \r
84         final String cefMessage = thresholdCalculatorOutput.getCefMessage();\r
85         final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();\r
86         final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();\r
87 \r
88         // alerts must have violated metrics per event name present\r
89         if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {\r
90             final String errorMessage = String.format(\r
91                     "No violated metricsPerEventName found for VES Message: %s." +\r
92                             "Ignored alert message: %s", cefMessage, alertMessageString);\r
93             throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
94         }\r
95 \r
96         final MetricsPerEventName violatedMetricsPerEventName =\r
97                 TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);\r
98         final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);\r
99         final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);\r
100         final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);\r
101         final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();\r
102 \r
103         switch (closedLoopEventStatus) {\r
104 \r
105             case ONSET:\r
106 \r
107                 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);\r
108                 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,\r
109                         null, tcaAlertsAbatementTable);\r
110                 LOG.debug("Emitting ONSET alert: {}", alertMessageString);\r
111                 alertsAbatementOutputEmitter.emit(alertMessageString);\r
112                 break;\r
113 \r
114             case ABATED:\r
115 \r
116                 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);\r
117                 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =\r
118                         TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,\r
119                                 tcaAlertsAbatementTable);\r
120 \r
121                 if (previousAlertsAbatementEntry != null) {\r
122 \r
123                     LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);\r
124 \r
125                     final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();\r
126                     if (abatementSentTS != null) {\r
127                         LOG.debug("Abatement alert was already sent at timestamp: {}. " +\r
128                                 "Skip resending this abatement alert again", abatementSentTS);\r
129                     } else {\r
130 \r
131                         final long newAbatementSentTS = new Date().getTime();\r
132                         LOG.debug(\r
133                                 "No abatement alert was sent before." +\r
134                                         "Sending abatement alert:{} for the first time at:{}",\r
135                                 alertMessageString, newAbatementSentTS);\r
136 \r
137                         // save new Abatement alert sent timestamp in table\r
138                         TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,\r
139                                 Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);\r
140 \r
141                         // Set request id to be same as previous ONSET event request ID\r
142                         tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());\r
143                         final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);\r
144 \r
145                         LOG.info("Emitting ABATED alert: {}", abatedAlertString);\r
146                         alertsAbatementOutputEmitter.emit(abatedAlertString);\r
147 \r
148                     }\r
149 \r
150                 } else {\r
151                     LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",\r
152                             alertMessageString);\r
153                 }\r
154 \r
155                 break;\r
156 \r
157             default:\r
158 \r
159                 final String errorMessage = String.format(\r
160                         "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +\r
161                                 "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);\r
162                 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
163 \r
164         }\r
165 \r
166 \r
167     }\r
168 \r
169 }\r