Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / TCADMaaPMRPublisherJob.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
22
23 import co.cask.cdap.api.TxRunnable;
24 import co.cask.cdap.api.common.Bytes;
25 import co.cask.cdap.api.data.DatasetContext;
26 import co.cask.cdap.api.dataset.lib.CloseableIterator;
27 import co.cask.cdap.api.dataset.lib.KeyValue;
28 import co.cask.cdap.api.dataset.lib.ObjectMappedTable;
29 import co.cask.cdap.api.metrics.Metrics;
30 import co.cask.cdap.api.worker.WorkerContext;
31 import com.google.common.base.Joiner;
32 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
33 import org.apache.tephra.TransactionFailureException;
34 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
35 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertEntity;
36 import org.openecomp.dcae.apod.analytics.cdap.common.persistance.tca.TCAVESAlertsPersister;
37 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
38 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
39 import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
40 import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRPublisherResponse;
41 import org.openecomp.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
42 import org.quartz.DisallowConcurrentExecution;
43 import org.quartz.Job;
44 import org.quartz.JobDataMap;
45 import org.quartz.JobExecutionContext;
46 import org.quartz.JobExecutionException;
47 import org.quartz.PersistJobDataAfterExecution;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import java.util.Date;
52 import java.util.LinkedHashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Set;
56
57 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.CDAP_ALERTS_TABLE_VARIABLE_NAME;
58 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME;
59 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.DMAAP_PUBLISHER_VARIABLE_NAME;
60 import static org.openecomp.dcae.apod.analytics.common.AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME;
61
62 /**
63  * Quartz Job that will monitor any new alert messages in given TCA Alerts table and if any found publish them to
64  * DMaaP MR topic
65  *<p>
66  * @author Rajiv Singla . Creation Date: 11/17/2016.
67  */
68 @DisallowConcurrentExecution
69 @PersistJobDataAfterExecution
70 @SuppressFBWarnings("SIC_INNER_SHOULD_BE_STATIC_ANON")
71 public class TCADMaaPMRPublisherJob implements Job {
72
73     private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRPublisherJob.class);
74
75     @Override
76     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
77
78         LOG.debug("Starting DMaaP MR Topic Publisher fetch Job. Next firing time will be: {}",
79                 jobExecutionContext.getNextFireTime());
80
81         // Get Job Data Map
82         final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
83
84         // Fetch all Job Params from Job Data Map
85         final String cdapAlertsTableName = jobDataMap.getString(CDAP_ALERTS_TABLE_VARIABLE_NAME);
86         final WorkerContext workerContext = (WorkerContext) jobDataMap.get(WORKER_CONTEXT_VARIABLE_NAME);
87         final DMaaPMRPublisher publisher = (DMaaPMRPublisher) jobDataMap.get(DMAAP_PUBLISHER_VARIABLE_NAME);
88         final Metrics metrics = (Metrics) jobDataMap.get(DMAAP_METRICS_VARIABLE_NAME);
89
90         LOG.debug("Start looking for new message in Alerts Table: {}", cdapAlertsTableName);
91
92         // Get new alerts from alerts table
93         final Map<String, TCAVESAlertEntity> newAlertsMap = getNewAlertsMap(cdapAlertsTableName, workerContext);
94
95         // If no new alerts are found - nothing to publish
96         if (newAlertsMap.isEmpty()) {
97             LOG.debug("No new alerts found in Alerts Table name: {}. Nothing to Publisher....", cdapAlertsTableName);
98             metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NO_NEW_ALERTS_LOOKUP_METRIC, 1);
99             return;
100         }
101
102         final int newAlertsCount = newAlertsMap.size();
103         LOG.debug("Found new alerts in Alerts Table name: {}. No of new alerts: {}", cdapAlertsTableName,
104                 newAlertsCount);
105         metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_NEW_ALERTS_METRIC, newAlertsCount);
106
107         // Get alert message strings from alert Entities
108         final List<String> newAlertsMessages = CDAPTCAUtils.extractAlertFromAlertEntities(newAlertsMap.values());
109
110         // Publish messages to DMaaP MR Topic
111         try {
112
113             final DMaaPMRPublisherResponse publisherResponse = publisher.publish(newAlertsMessages);
114
115             final Integer responseCode = publisherResponse.getResponseCode();
116             final String responseMessage = publisherResponse.getResponseMessage();
117             final int pendingMessagesCount = publisherResponse.getPendingMessagesCount();
118
119             LOG.debug("Publisher Response Code: {}, Publisher message: {}, Pending Messages Count: {}", responseCode,
120                     responseMessage, pendingMessagesCount);
121
122             if (HTTPUtils.isSuccessfulResponseCode(responseCode)) {
123                 LOG.debug("Successfully Published alerts to DMaaP MR Topic.");
124                 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_SUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
125             } else {
126                 LOG.warn("Unable to publish alerts to DMaaP MR Topic. Publisher will try to send it later....");
127                 metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_UNSUCCESSFUL_DMAAP_RESPONSE_METRIC, 1);
128             }
129
130         } catch (DCAEAnalyticsRuntimeException e) {
131             LOG.error("Exception while publishing messages to DMaaP MR Topic: {}", e);
132         } finally {
133             // delete send message from alerts table
134             deleteAlertsByKey(cdapAlertsTableName, workerContext, newAlertsMap.keySet(), metrics);
135         }
136
137         LOG.debug("Finished DMaaP MR Topic Publisher fetch Job.");
138
139     }
140
141     /**
142      * Gets New Messages from alerts table as Map with row keys as keys and {@link TCAVESAlertEntity} as values
143      *
144      * @param cdapAlertsTableName alerts table name
145      * @param workerContext worker context
146      * @return Map with row keys as keys and {@link TCAVESAlertEntity} as values
147      */
148     protected Map<String, TCAVESAlertEntity> getNewAlertsMap(final String cdapAlertsTableName,
149                                                            final WorkerContext workerContext) {
150         final Map<String, TCAVESAlertEntity> newAlertsMap = new LinkedHashMap<>();
151         try {
152             workerContext.execute(new TxRunnable() {
153                 @Override
154                 public void run(DatasetContext context) throws Exception {
155                     final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
156                     final Date currentTime = new Date();
157                     final String rowKey = TCAVESAlertsPersister.createRowKey(currentTime);
158                     final CloseableIterator<KeyValue<byte[], TCAVESAlertEntity>> scan = alertsTable.scan(null, rowKey);
159                     while (scan.hasNext()) {
160                         final KeyValue<byte[], TCAVESAlertEntity> alertEntityKeyValue = scan.next();
161                         newAlertsMap.put(Bytes.toString(alertEntityKeyValue.getKey()), alertEntityKeyValue.getValue());
162                     }
163                 }
164             });
165         } catch (TransactionFailureException e) {
166             final String errorMessage = "Transaction Error while getting new alerts from alerts table: " + e.toString();
167             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
168         }
169         return newAlertsMap;
170     }
171
172     /**
173      * Deletes rows in Alerts table for give rowKeys
174      *
175      * @param cdapAlertsTableName CDAP Alerts Table Name
176      * @param workerContext Worker Context
177      * @param rowKeys Row Key Set
178      * @param metrics CDAP metrics
179      */
180     protected void deleteAlertsByKey(final String cdapAlertsTableName, final WorkerContext workerContext,
181                                    final Set<String> rowKeys, final Metrics metrics) {
182         LOG.debug("Deleting Published Alerts from alerts table with rowKeys: {}", Joiner.on(",").join(rowKeys));
183         try {
184             workerContext.execute(new TxRunnable() {
185                 @Override
186                 public void run(DatasetContext context) throws Exception {
187                     final ObjectMappedTable<TCAVESAlertEntity> alertsTable = context.getDataset(cdapAlertsTableName);
188                     for (String rowKey : rowKeys) {
189                         alertsTable.delete(rowKey);
190                         metrics.count(CDAPMetricsConstants.TCA_PUBLISHER_DELETED_ALERTS_METRIC, 1);
191                     }
192                 }
193             });
194         } catch (TransactionFailureException e) {
195             final String errorMessage =
196                     "Transaction Error while deleting published alerts in alerts table: " + e.toString();
197             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
198         }
199     }
200 }