Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / BaseTCADMaaPMRWorker.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;\r
22 \r
23 import co.cask.cdap.api.worker.AbstractWorker;\r
24 import com.google.common.base.Preconditions;\r
25 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;\r
26 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
27 import org.quartz.Scheduler;\r
28 import org.quartz.SchedulerException;\r
29 import org.slf4j.Logger;\r
30 import org.slf4j.LoggerFactory;\r
31 \r
32 import java.util.concurrent.atomic.AtomicBoolean;\r
33 \r
34 import static java.lang.String.format;\r
35 \r
36 /**\r
37  * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals\r
38  * <p>\r
39  * @author Rajiv Singla . Creation Date: 12/19/2016.\r
40  */\r
41 public abstract class BaseTCADMaaPMRWorker extends AbstractWorker {\r
42 \r
43     private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class);\r
44 \r
45     /**\r
46      * Quartz Scheduler\r
47      */\r
48     protected Scheduler scheduler;\r
49     /**\r
50      * Determines if scheduler is shutdown\r
51      */\r
52     protected AtomicBoolean isSchedulerShutdown;\r
53 \r
54 \r
55     @Override\r
56     public void run() {\r
57 \r
58         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");\r
59         String schedulerName = "";\r
60 \r
61         // Start scheduler\r
62         try {\r
63             schedulerName = scheduler.getSchedulerName();\r
64             scheduler.start();\r
65             isSchedulerShutdown.getAndSet(false);\r
66 \r
67         } catch (SchedulerException e) {\r
68             final String errorMessage =\r
69                     format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e);\r
70             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
71         }\r
72 \r
73         LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName);\r
74 \r
75         // indefinite loop which wakes up and confirms scheduler is indeed running\r
76         while (!isSchedulerShutdown.get()) {\r
77             try {\r
78 \r
79                 Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS);\r
80 \r
81             } catch (InterruptedException e) {\r
82 \r
83                 final String errorMessage =\r
84                         format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s",\r
85                                 schedulerName, e);\r
86                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
87             }\r
88         }\r
89 \r
90         LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName);\r
91 \r
92     }\r
93 \r
94     @Override\r
95     public void stop() {\r
96 \r
97         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");\r
98         String schedulerName = "";\r
99 \r
100         // Stop Scheduler\r
101         try {\r
102             schedulerName = scheduler.getSchedulerName();\r
103             LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName);\r
104             scheduler.shutdown();\r
105             isSchedulerShutdown.getAndSet(true);\r
106 \r
107         } catch (SchedulerException e) {\r
108 \r
109             final String errorMessage =\r
110                     format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e);\r
111             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);\r
112         }\r
113     }\r
114 \r
115 \r
116 }\r