2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;
\r
23 import co.cask.cdap.api.annotation.Output;
\r
24 import co.cask.cdap.api.annotation.ProcessInput;
\r
25 import co.cask.cdap.api.annotation.Property;
\r
26 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
\r
27 import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
\r
28 import co.cask.cdap.api.flow.flowlet.FlowletContext;
\r
29 import co.cask.cdap.api.flow.flowlet.OutputEmitter;
\r
30 import org.apache.commons.lang3.StringUtils;
\r
31 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
\r
32 import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
\r
33 import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
\r
34 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
\r
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
\r
36 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
\r
37 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
\r
38 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
\r
39 import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;
\r
40 import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
\r
41 import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;
\r
42 import org.slf4j.Logger;
\r
43 import org.slf4j.LoggerFactory;
\r
45 import java.io.IOException;
\r
46 import java.util.Date;
\r
49 * Flowlet responsible to sending out abatement alerts
\r
51 * @author Rajiv Singla . Creation Date: 9/11/2017.
\r
53 public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
\r
55 private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
\r
58 private final String tcaAlertsAbatementTableName;
\r
60 @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
\r
61 protected OutputEmitter<String> alertsAbatementOutputEmitter;
\r
63 private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
\r
65 public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
\r
66 this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
\r
70 public void configure() {
\r
71 setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
\r
72 setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
\r
76 public void initialize(FlowletContext flowletContext) throws Exception {
\r
77 super.initialize(flowletContext);
\r
78 tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
\r
81 @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
\r
82 public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {
\r
84 final String cefMessage = thresholdCalculatorOutput.getCefMessage();
\r
85 final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
\r
86 final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
\r
88 // alerts must have violated metrics per event name present
\r
89 if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
\r
90 final String errorMessage = String.format(
\r
91 "No violated metricsPerEventName found for VES Message: %s." +
\r
92 "Ignored alert message: %s", cefMessage, alertMessageString);
\r
93 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
\r
96 final MetricsPerEventName violatedMetricsPerEventName =
\r
97 TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
\r
98 final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
\r
99 final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
\r
100 final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
\r
101 final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
\r
103 switch (closedLoopEventStatus) {
\r
107 LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
\r
108 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
\r
109 null, tcaAlertsAbatementTable);
\r
110 LOG.debug("Emitting ONSET alert: {}", alertMessageString);
\r
111 alertsAbatementOutputEmitter.emit(alertMessageString);
\r
116 LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
\r
117 final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
\r
118 TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
\r
119 tcaAlertsAbatementTable);
\r
121 if (previousAlertsAbatementEntry != null) {
\r
123 LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
\r
125 final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
\r
126 if (abatementSentTS != null) {
\r
127 LOG.debug("Abatement alert was already sent at timestamp: {}. " +
\r
128 "Skip resending this abatement alert again", abatementSentTS);
\r
131 final long newAbatementSentTS = new Date().getTime();
\r
133 "No abatement alert was sent before." +
\r
134 "Sending abatement alert:{} for the first time at:{}",
\r
135 alertMessageString, newAbatementSentTS);
\r
137 // save new Abatement alert sent timestamp in table
\r
138 TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
\r
139 Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
\r
141 // Set request id to be same as previous ONSET event request ID
\r
142 tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
\r
143 final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);
\r
145 LOG.info("Emitting ABATED alert: {}", abatedAlertString);
\r
146 alertsAbatementOutputEmitter.emit(abatedAlertString);
\r
151 LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
\r
152 alertMessageString);
\r
159 final String errorMessage = String.format(
\r
160 "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
\r
161 "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
\r
162 throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
\r