2 * ===============================LICENSE_START======================================
\r
4 * ================================================================================
\r
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============================LICENSE_END===========================================
\r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
\r
23 import co.cask.cdap.api.TxRunnable;
\r
24 import co.cask.cdap.api.common.Bytes;
\r
25 import co.cask.cdap.api.data.DatasetContext;
\r
26 import co.cask.cdap.api.dataset.lib.CloseableIterator;
\r
27 import co.cask.cdap.api.dataset.lib.KeyValue;
\r
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
\r
29 import co.cask.cdap.api.metrics.Metrics;
\r
30 import co.cask.cdap.api.worker.WorkerContext;
\r
31 import com.google.common.base.Joiner;
\r
32 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
\r
33 import org.apache.tephra.TransactionFailureException;
\r
34 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
\r
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
\r
36 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
\r
37 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
\r
38 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r
39 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
\r
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
\r
41 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
\r
42 import org.quartz.DisallowConcurrentExecution;
\r
43 import org.quartz.Job;
\r
44 import org.quartz.JobDataMap;
\r
45 import org.quartz.JobExecutionContext;
\r
46 import org.quartz.JobExecutionException;
\r
47 import org.quartz.PersistJobDataAfterExecution;
\r
48 import org.slf4j.Logger;
\r
49 import org.slf4j.LoggerFactory;
\r
51 import java.util.Date;
\r
52 import java.util.LinkedHashMap;
\r
53 import java.util.List;
\r
54 import java.util.Map;
\r
55 import java.util.Set;
\r
57 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME;
\r
58 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME;
\r
59 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME;
\r
60 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME;
\r
63 * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to
\r
66 * @author Rajiv Singla . Creation Date: 11/17/2016.
\r
68 @DisallowConcurrentExecution
\r
69 @PersistJobDataAfterExecution
\r
70 @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
\r
71 public class TCADMaaPMRPublisherJob implements Job {
\r
73 private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class);
\r
76 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
\r
78 LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}",
\r
79 jobExecutionContext.getNextFireTime());
\r
82 final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
\r
84 // Fetch all Job Params from Job Data Map
\r
85 final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME);
\r
86 final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME);
\r
87 final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME);
\r
88 final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME);
\r
90 LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName);
\r
92 // Get new alerts from alerts table
\r
93 final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext);
\r
95 // If no new alerts are found - nothing to publish
\r
96 if (newAlertsMap.isEmpty()) {
\r
97 LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName);
\r
98 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1);
\r
102 final int newAlertsCount = newAlertsMap.size();
\r
103 LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName,
\r
105 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount);
\r
107 // Get alert message strings from alert Entities
\r
108 final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values());
\r
110 // Publish messages to DMaaP MR Topic
\r
113 final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages);
\r
115 final Integer responseCode = publisherResponse.getResponseCode();
\r
116 final String responseMessage = publisherResponse.getResponseMessage();
\r
117 final int pendingMessagesCount = publisherResponse.getPendingMessagesCount();
\r
119 LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode,
\r
120 responseMessage, pendingMessagesCount);
\r
122 if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
\r
123 LOG.debug("Successfully Published alerts to DMaaP MR Topic.");
\r
124 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
\r
126 LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later....");
\r
127 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
\r
130 } catch (DCAEAnalyticsRuntimeException e) {
\r
131 LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e);
\r
133 // delete send message from alerts table
\r
134 deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics);
\r
137 LOG.debug("Finished DMaaP MR Topic Publisher fetch Job.");
\r
142 * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values
\r
144 * @param cdapAlertsTableName alerts table name
\r
145 * @param workerContext worker context
\r
146 * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values
\r
148 protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName,
\r
149 final WorkerContext workerContext) {
\r
150 final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>();
\r
152 workerContext.execute(new TxRunnable() {
\r
154 public void run(DatasetContext context) throws Exception {
\r
155 final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
\r
156 final Date currentTime = new Date();
\r
157 final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime);
\r
158 final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey);
\r
159 while (scan.hasNext()) {
\r
160 final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next();
\r
161 newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue());
\r
165 } catch (TransactionFailureException e) {
\r
166 final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString();
\r
167 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r
169 return newAlertsMap;
\r
173 * Deletes rows in Alerts table for give rowKeys
\r
175 * @param cdapAlertsTableName CDAP Alerts Table Name
\r
176 * @param workerContext Worker Context
\r
177 * @param rowKeys Row Key Set
\r
178 * @param metrics CDAP metrics
\r
180 protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext,
\r
181 final Set<String> rowKeys, final Metrics metrics) {
\r
182 LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys));
\r
184 workerContext.execute(new TxRunnable() {
\r
186 public void run(DatasetContext context) throws Exception {
\r
187 final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
\r
188 for (String rowKey : rowKeys) {
\r
189 alertsTable.delete(rowKey);
\r
190 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1);
\r
194 } catch (TransactionFailureException e) {
\r
195 final String errorMessage =
\r
196 "Transaction Error while deleting published alerts in alerts table: " + e.toString();
\r
197 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
\r