-/*\r
- * ===============================LICENSE_START======================================\r
- * dcae-analytics\r
- * ================================================================================\r
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.cdap.tca.flowlet;\r
-\r
-import co.cask.cdap.api.annotation.Output;\r
-import co.cask.cdap.api.annotation.ProcessInput;\r
-import co.cask.cdap.api.annotation.Property;\r
-import co.cask.cdap.api.dataset.lib.ObjectMappedTable;\r
-import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;\r
-import co.cask.cdap.api.flow.flowlet.FlowletContext;\r
-import co.cask.cdap.api.flow.flowlet.OutputEmitter;\r
-import org.apache.commons.lang3.StringUtils;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;\r
-import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;\r
-import org.openecomp.dcae.apod.analytics.model.domain.policy.tca.Threshold;\r
-import org.openecomp.dcae.apod.analytics.model.facade.tca.TCAVESResponse;\r
-import org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.IOException;\r
-import java.util.Date;\r
-\r
-/**\r
- * Flowlet responsible to sending out abatement alerts\r
- *\r
- * @author Rajiv Singla . Creation Date: 9/11/2017.\r
- */\r
-public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);\r
-\r
- @Property\r
- private final String tcaAlertsAbatementTableName;\r
-\r
- @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)\r
- protected OutputEmitter<String> alertsAbatementOutputEmitter;\r
-\r
- private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;\r
-\r
- public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {\r
- this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;\r
- }\r
-\r
- @Override\r
- public void configure() {\r
- setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);\r
- setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);\r
- }\r
-\r
- @Override\r
- public void initialize(FlowletContext flowletContext) throws Exception {\r
- super.initialize(flowletContext);\r
- tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);\r
- }\r
-\r
- @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)\r
- public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {\r
-\r
- final String cefMessage = thresholdCalculatorOutput.getCefMessage();\r
- final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();\r
- final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();\r
-\r
- // alerts must have violated metrics per event name present\r
- if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {\r
- final String errorMessage = String.format(\r
- "No violated metricsPerEventName found for VES Message: %s." +\r
- "Ignored alert message: %s", cefMessage, alertMessageString);\r
- throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
- }\r
-\r
- final MetricsPerEventName violatedMetricsPerEventName =\r
- TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);\r
- final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);\r
- final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);\r
- final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);\r
- final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();\r
-\r
- switch (closedLoopEventStatus) {\r
-\r
- case ONSET:\r
-\r
- LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);\r
- TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,\r
- null, tcaAlertsAbatementTable);\r
- LOG.debug("Emitting ONSET alert: {}", alertMessageString);\r
- alertsAbatementOutputEmitter.emit(alertMessageString);\r
- break;\r
-\r
- case ABATED:\r
-\r
- LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);\r
- final TCAAlertsAbatementEntity previousAlertsAbatementEntry =\r
- TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,\r
- tcaAlertsAbatementTable);\r
-\r
- if (previousAlertsAbatementEntry != null) {\r
-\r
- LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);\r
-\r
- final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();\r
- if (abatementSentTS != null) {\r
- LOG.debug("Abatement alert was already sent at timestamp: {}. " +\r
- "Skip resending this abatement alert again", abatementSentTS);\r
- } else {\r
-\r
- final long newAbatementSentTS = new Date().getTime();\r
- LOG.debug(\r
- "No abatement alert was sent before." +\r
- "Sending abatement alert:{} for the first time at:{}",\r
- alertMessageString, newAbatementSentTS);\r
-\r
- // save new Abatement alert sent timestamp in table\r
- TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,\r
- Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);\r
-\r
- // Set request id to be same as previous ONSET event request ID\r
- tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());\r
- final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);\r
-\r
- LOG.info("Emitting ABATED alert: {}", abatedAlertString);\r
- alertsAbatementOutputEmitter.emit(abatedAlertString);\r
-\r
- }\r
-\r
- } else {\r
- LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",\r
- alertMessageString);\r
- }\r
-\r
- break;\r
-\r
- default:\r
-\r
- final String errorMessage = String.format(\r
- "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +\r
- "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);\r
- throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));\r
-\r
- }\r
-\r
-\r
- }\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.tca.flowlet;
+
+import co.cask.cdap.api.annotation.Output;
+import co.cask.cdap.api.annotation.ProcessInput;
+import co.cask.cdap.api.annotation.Property;
+import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
+import co.cask.cdap.api.flow.flowlet.AbstractFlowlet;
+import co.cask.cdap.api.flow.flowlet.FlowletContext;
+import co.cask.cdap.api.flow.flowlet.OutputEmitter;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.dcae.apod.analytics.cdap.common.CDAPComponentsConstants;
+import org.onap.dcae.apod.analytics.cdap.common.domain.tca.ThresholdCalculatorOutput;
+import org.onap.dcae.apod.analytics.cdap.common.exception.CDAPSettingsException;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementEntity;
+import org.onap.dcae.apod.analytics.cdap.common.persistance.tca.TCAAlertsAbatementPersister;
+import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.ClosedLoopEventStatus;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.MetricsPerEventName;
+import org.onap.dcae.apod.analytics.model.domain.policy.tca.Threshold;
+import org.onap.dcae.apod.analytics.model.facade.tca.TCAVESResponse;
+import org.onap.dcae.apod.analytics.tca.utils.TCAUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+
+/**
+ * Flowlet responsible to sending out abatement alerts
+ *
+ * @author Rajiv Singla . Creation Date: 9/11/2017.
+ */
+public class TCAVESAlertsAbatementFlowlet extends AbstractFlowlet {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TCAVESAlertsAbatementFlowlet.class);
+
+ @Property
+ private final String tcaAlertsAbatementTableName;
+
+ @Output(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_OUTPUT)
+ protected OutputEmitter<String> alertsAbatementOutputEmitter;
+
+ private ObjectMappedTable<TCAAlertsAbatementEntity> tcaAlertsAbatementTable;
+
+ public TCAVESAlertsAbatementFlowlet(final String tcaAlertsAbatementTableName) {
+ this.tcaAlertsAbatementTableName = tcaAlertsAbatementTableName;
+ }
+
+ @Override
+ public void configure() {
+ setName(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_NAME_FLOWLET);
+ setDescription(CDAPComponentsConstants.TCA_FIXED_VES_ALERTS_ABATEMENT_DESCRIPTION_FLOWLET);
+ }
+
+ @Override
+ public void initialize(FlowletContext flowletContext) throws Exception {
+ super.initialize(flowletContext);
+ tcaAlertsAbatementTable = getContext().getDataset(tcaAlertsAbatementTableName);
+ }
+
+ @ProcessInput(CDAPComponentsConstants.TCA_FIXED_VES_TCA_CALCULATOR_NAME_OUTPUT)
+ public void determineAbatementAlerts(final ThresholdCalculatorOutput thresholdCalculatorOutput) throws IOException {
+
+ final String cefMessage = thresholdCalculatorOutput.getCefMessage();
+ final String alertMessageString = thresholdCalculatorOutput.getAlertMessage();
+ final String violatedMetricsPerEventNameString = thresholdCalculatorOutput.getViolatedMetricsPerEventName();
+
+ // alerts must have violated metrics per event name present
+ if (StringUtils.isBlank(violatedMetricsPerEventNameString)) {
+ final String errorMessage = String.format(
+ "No violated metricsPerEventName found for VES Message: %s." +
+ "Ignored alert message: %s", cefMessage, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+ }
+
+ final MetricsPerEventName violatedMetricsPerEventName =
+ TCAUtils.readValue(violatedMetricsPerEventNameString, MetricsPerEventName.class);
+ final EventListener eventListener = TCAUtils.readValue(cefMessage, EventListener.class);
+ final TCAVESResponse tcavesResponse = TCAUtils.readValue(alertMessageString, TCAVESResponse.class);
+ final Threshold violatedThreshold = violatedMetricsPerEventName.getThresholds().get(0);
+ final ClosedLoopEventStatus closedLoopEventStatus = violatedThreshold.getClosedLoopEventStatus();
+
+ switch (closedLoopEventStatus) {
+
+ case ONSET:
+
+ LOG.debug("Saving information for ONSET event for cefMessage: {}", cefMessage);
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ null, tcaAlertsAbatementTable);
+ LOG.debug("Emitting ONSET alert: {}", alertMessageString);
+ alertsAbatementOutputEmitter.emit(alertMessageString);
+ break;
+
+ case ABATED:
+
+ LOG.debug("Looking up previous sent alert for abated threshold: {}", violatedThreshold);
+ final TCAAlertsAbatementEntity previousAlertsAbatementEntry =
+ TCAAlertsAbatementPersister.lookUpByKey(eventListener, violatedMetricsPerEventName,
+ tcaAlertsAbatementTable);
+
+ if (previousAlertsAbatementEntry != null) {
+
+ LOG.debug("Found previous AlertsAbatementEntity: {}", previousAlertsAbatementEntry);
+
+ final String abatementSentTS = previousAlertsAbatementEntry.getAbatementSentTS();
+ if (abatementSentTS != null) {
+ LOG.debug("Abatement alert was already sent at timestamp: {}. " +
+ "Skip resending this abatement alert again", abatementSentTS);
+ } else {
+
+ final long newAbatementSentTS = new Date().getTime();
+ LOG.debug(
+ "No abatement alert was sent before." +
+ "Sending abatement alert:{} for the first time at:{}",
+ alertMessageString, newAbatementSentTS);
+
+ // save new Abatement alert sent timestamp in table
+ TCAAlertsAbatementPersister.persist(eventListener, violatedMetricsPerEventName, tcavesResponse,
+ Long.toString(newAbatementSentTS), tcaAlertsAbatementTable);
+
+ // Set request id to be same as previous ONSET event request ID
+ tcavesResponse.setRequestID(previousAlertsAbatementEntry.getRequestId());
+ final String abatedAlertString = TCAUtils.writeValueAsString(tcavesResponse);
+
+ LOG.info("Emitting ABATED alert: {}", abatedAlertString);
+ alertsAbatementOutputEmitter.emit(abatedAlertString);
+
+ }
+
+ } else {
+ LOG.info("No previous ONSET alert was found for this ABATED alert: {}.Skip sending abated alert.",
+ alertMessageString);
+ }
+
+ break;
+
+ default:
+
+ final String errorMessage = String.format(
+ "Unexpected ClosedLoopEventStatus: %s. Only ONSET and ABATED are supported." +
+ "Ignoring alert: %s", closedLoopEventStatus, alertMessageString);
+ throw new CDAPSettingsException(errorMessage, LOG, new IllegalStateException(errorMessage));
+
+ }
+
+
+ }
+
+}