7681f5b85cfe297cf120d2533268d146b7ce923c
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / onap / dcae / apod / analytics / cdap / tca / worker / BaseTCADMaaPMRWorker.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.onap.dcae.apod.analytics.cdap.tca.worker;
22
23 import co.cask.cdap.api.worker.AbstractWorker;
24 import com.google.common.base.Preconditions;
25 import org.onap.dcae.apod.analytics.common.AnalyticsConstants;
26 import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
27 import org.quartz.Scheduler;
28 import org.quartz.SchedulerException;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 import static java.lang.String.format;
35
36 /**
37  * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals
38  * <p>
39  * @author Rajiv Singla . Creation Date: 12/19/2016.
40  */
41 public abstract class BaseTCADMaaPMRWorker extends AbstractWorker {
42
43     private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class);
44
45     /**
46      * Quartz Scheduler
47      */
48     protected Scheduler scheduler;
49     /**
50      * Determines if scheduler is shutdown
51      */
52     protected AtomicBoolean isSchedulerShutdown;
53
54
55     @Override
56     public void run() {
57
58         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
59         String schedulerName = "";
60
61         // Start scheduler
62         try {
63             schedulerName = scheduler.getSchedulerName();
64             scheduler.start();
65             isSchedulerShutdown.getAndSet(false);
66
67         } catch (SchedulerException e) {
68             final String errorMessage =
69                     format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e);
70             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
71         }
72
73         LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName);
74
75         // indefinite loop which wakes up and confirms scheduler is indeed running
76         while (!isSchedulerShutdown.get()) {
77             try {
78
79                 Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS);
80
81             } catch (InterruptedException e) {
82
83                 final String errorMessage =
84                         format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s",
85                                 schedulerName, e);
86                 Thread.currentThread().interrupt();
87                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
88             }
89         }
90
91         LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName);
92
93     }
94
95     @Override
96     public void stop() {
97
98         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
99         String schedulerName = "";
100
101         // Stop Scheduler
102         try {
103             schedulerName = scheduler.getSchedulerName();
104             LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName);
105             scheduler.shutdown();
106             isSchedulerShutdown.getAndSet(true);
107
108         } catch (SchedulerException e) {
109
110             final String errorMessage =
111                     format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e);
112             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
113         }
114     }
115
116
117 }