2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
23 import co.cask.cdap.api.TxRunnable;
24 import co.cask.cdap.api.common.Bytes;
25 import co.cask.cdap.api.data.DatasetContext;
26 import co.cask.cdap.api.dataset.lib.CloseableIterator;
27 import co.cask.cdap.api.dataset.lib.KeyValue;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
29 import co.cask.cdap.api.metrics.Metrics;
30 import co.cask.cdap.api.worker.WorkerContext;
31 import com.google.common.base.Joiner;
32 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33 import org.apache.tephra.TransactionFailureException;
34 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
36 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
37 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
38 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
39 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
41 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
42 import org.quartz.DisallowConcurrentExecution;
43 import org.quartz.Job;
44 import org.quartz.JobDataMap;
45 import org.quartz.JobExecutionContext;
46 import org.quartz.JobExecutionException;
47 import org.quartz.PersistJobDataAfterExecution;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import java.util.Date;
52 import java.util.LinkedHashMap;
53 import java.util.List;
57 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME;
58 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME;
59 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME;
60 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME;
63 * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to
66 * @author Rajiv Singla . Creation Date: 11/17/2016.
68 @DisallowConcurrentExecution
69 @PersistJobDataAfterExecution
70 @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
71 public class TCADMaaPMRPublisherJob implements Job {
73 private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class);
76 public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
78 LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}",
79 jobExecutionContext.getNextFireTime());
82 final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
84 // Fetch all Job Params from Job Data Map
85 final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME);
86 final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME);
87 final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME);
88 final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME);
90 LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName);
92 // Get new alerts from alerts table
93 final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext);
95 // If no new alerts are found - nothing to publish
96 if (newAlertsMap.isEmpty()) {
97 LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName);
98 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1);
102 final int newAlertsCount = newAlertsMap.size();
103 LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName,
105 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount);
107 // Get alert message strings from alert Entities
108 final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values());
110 // Publish messages to DMaaP MR Topic
113 final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages);
115 final Integer responseCode = publisherResponse.getResponseCode();
116 final String responseMessage = publisherResponse.getResponseMessage();
117 final int pendingMessagesCount = publisherResponse.getPendingMessagesCount();
119 LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode,
120 responseMessage, pendingMessagesCount);
122 if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
123 LOG.debug("Successfully Published alerts to DMaaP MR Topic.");
124 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
126 LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later....");
127 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
130 } catch (DCAEAnalyticsRuntimeException e) {
131 LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e);
133 // delete send message from alerts table
134 deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics);
137 LOG.debug("Finished DMaaP MR Topic Publisher fetch Job.");
142 * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values
144 * @param cdapAlertsTableName alerts table name
145 * @param workerContext worker context
146 * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values
148 protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName,
149 final WorkerContext workerContext) {
150 final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>();
152 workerContext.execute(new TxRunnable() {
154 public void run(DatasetContext context) throws Exception {
155 final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
156 final Date currentTime = new Date();
157 final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime);
158 final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey);
159 while (scan.hasNext()) {
160 final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next();
161 newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue());
165 } catch (TransactionFailureException e) {
166 final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString();
167 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
173 * Deletes rows in Alerts table for give rowKeys
175 * @param cdapAlertsTableName CDAP Alerts Table Name
176 * @param workerContext Worker Context
177 * @param rowKeys Row Key Set
178 * @param metrics CDAP metrics
180 protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext,
181 final Set<String> rowKeys, final Metrics metrics) {
182 LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys));
184 workerContext.execute(new TxRunnable() {
186 public void run(DatasetContext context) throws Exception {
187 final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
188 for (String rowKey : rowKeys) {
189 alertsTable.delete(rowKey);
190 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1);
194 } catch (TransactionFailureException e) {
195 final String errorMessage =
196 "Transaction Error while deleting published alerts in alerts table: " + e.toString();
197 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);