Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / TCADMaaPMRSubscriberJob.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;\r
22 \r
23 import co.cask.cdap.api.metrics.Metrics;\r
24 import co.cask.cdap.api.worker.WorkerContext;\r
25 import com.google.common.base.Optional;\r
26 import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;\r
27 import org.openecomp.dcae.apod.analytics.cdap.common.utils.DMaaPMRUtils;\r
28 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
29 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
30 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;\r
31 import org.quartz.DisallowConcurrentExecution;\r
32 import org.quartz.Job;\r
33 import org.quartz.JobDataMap;\r
34 import org.quartz.JobExecutionContext;\r
35 import org.quartz.JobExecutionException;\r
36 import org.quartz.PersistJobDataAfterExecution;\r
37 import org.slf4j.Logger;\r
38 import org.slf4j.LoggerFactory;\r
39 \r
40 import java.io.IOException;\r
41 import java.util.List;\r
42 \r
43 import static java.lang.String.format;\r
44 \r
45 /**\r
46  * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to\r
47  * a given CDAP Stream\r
48  *\r
49  * @author Rajiv Singla . Creation Date: 10/24/2016.\r
50  */\r
51 @DisallowConcurrentExecution\r
52 @PersistJobDataAfterExecution\r
53 public class TCADMaaPMRSubscriberJob implements Job {\r
54 \r
55     private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class);\r
56 \r
57     @Override\r
58     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {\r
59 \r
60         LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}",\r
61                 jobExecutionContext.getNextFireTime());\r
62 \r
63         // Get Job Data Map\r
64         final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();\r
65 \r
66         // Fetch all Job Params from Job Data Map\r
67         final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME);\r
68         final WorkerContext workerContext =\r
69                 (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME);\r
70         final DMaaPMRSubscriber subscriber =\r
71                 (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME);\r
72         final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME);\r
73 \r
74         final Optional<List<String>> subscriberMessagesOptional =\r
75                 DMaaPMRUtils.getSubscriberMessages(subscriber, metrics);\r
76 \r
77         // Write message to CDAP Stream using Stream Writer\r
78         if (subscriberMessagesOptional.isPresent()) {\r
79             writeMessageToCDAPStream(subscriberMessagesOptional.get(), cdapStreamName, workerContext, metrics);\r
80         }\r
81     }\r
82 \r
83 \r
84     /**\r
85      * Writes given messages to CDAP Stream\r
86      *\r
87      * @param actualMessages List of messages that need to written to cdap stream\r
88      * @param cdapStreamName cdap stream name\r
89      * @param workerContext cdap worker context\r
90      * @param metrics cdap metrics\r
91      */\r
92     private void writeMessageToCDAPStream(final List<String> actualMessages, final String cdapStreamName,\r
93                                           final WorkerContext workerContext, final Metrics metrics) {\r
94         LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size());\r
95         try {\r
96 \r
97             for (String message : actualMessages) {\r
98                 workerContext.write(cdapStreamName, message);\r
99             }\r
100 \r
101         } catch (IOException e) {\r
102             metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1);\r
103             final String errorMessage =\r
104                     format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " +\r
105                             "Exception: %s", cdapStreamName, e);\r
106             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
107         }\r
108 \r
109         LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}",\r
110                 cdapStreamName, actualMessages.size());\r
111 \r
112     }\r
113 \r
114 }\r