9fb9d83d171465f859c32b162d389557596160b4
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / TCADMaaPMRSubscriberJob.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
22
23 import co.cask.cdap.api.metrics.Metrics;
24 import co.cask.cdap.api.worker.WorkerContext;
25 import com.google.common.base.Optional;
26 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
27 import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils;
28 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
29 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
30 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
31 import org.quartz.DisallowConcurrentExecution;
32 import org.quartz.Job;
33 import org.quartz.JobDataMap;
34 import org.quartz.JobExecutionContext;
35 import org.quartz.JobExecutionException;
36 import org.quartz.PersistJobDataAfterExecution;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import java.io.IOException;
41 import java.util.List;
42
43 import static java.lang.String.format;
44
45 /**
46  * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to
47  * a given CDAP Stream
48  *
49  * @author Rajiv Singla . Creation Date: 10/24/2016.
50  */
51 @DisallowConcurrentExecution
52 @PersistJobDataAfterExecution
53 public class TCADMaaPMRSubscriberJob implements Job {
54
55     private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class);
56
57     @Override
58     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
59
60         LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}",
61                 jobExecutionContext.getNextFireTime());
62
63         // Get Job Data Map
64         final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
65
66         // Fetch all Job Params from Job Data Map
67         final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME);
68         final WorkerContext workerContext =
69                 (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME);
70         final DMaaPMRSubscriber subscriber =
71                 (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME);
72         final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME);
73
74         final Optional<List<String>> subscriberMessagesOptional =
75                 DMaaPMRUtils.getSubscriberMessages(subscriber, metrics);
76
77         // Write message to CDAP Stream using Stream Writer
78         if (subscriberMessagesOptional.isPresent()) {
79             writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics);
80         }
81     }
82
83
84     /**
85      * Writes given messages to CDAP Stream
86      *
87      * @param actualMessages List of messages that need to written to cdap stream
88      * @param cdapStreamName cdap stream name
89      * @param workerContext cdap worker context
90      * @param metrics cdap metrics
91      */
92     private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName,
93                                           final WorkerContext workerContext, final Metrics metrics) {
94         LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size());
95         try {
96
97             for (String message : actualMessages) {
98                 workerContext.write(cdapStreamName, message);
99             }
100
101         } catch (IOException e) {
102             metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1);
103             final String errorMessage =
104                     format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " +
105                             "Exception: %s", cdapStreamName, e);
106             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
107         }
108
109         LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}",
110                 cdapStreamName, actualMessages.size());
111
112     }
113
114 }