2  * ===============================LICENSE_START======================================
 
   4  * ================================================================================
 
   5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
 
   8  *  you may not use this file except in compliance with the License.
 
   9  *   You may obtain a copy of the License at
 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
 
  13  *  Unless required by applicable law or agreed to in writing, software
 
  14  *  distributed under the License is distributed on an "AS IS" BASIS,
 
  15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  *  See the License for the specific language governing permissions and
 
  17  *  limitations under the License.
 
  18  *  ============================LICENSE_END===========================================
 
  21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
 
  23 import co.cask.cdap.api.worker.AbstractWorker;
 
  24 import com.google.common.base.Preconditions;
 
  25 import org.openecomp.dcae.apod.analytics.common.AnalyticsConstants;
 
  26 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
 
  27 import org.quartz.Scheduler;
 
  28 import org.quartz.SchedulerException;
 
  29 import org.slf4j.Logger;
 
  30 import org.slf4j.LoggerFactory;
 
  32 import java.util.concurrent.atomic.AtomicBoolean;
 
  34 import static java.lang.String.format;
 
  37  * Base logic for DMaaP Workers which uses scheduler to poll DMaaP MR topics at frequent intervals
 
  39  * @author Rajiv Singla . Creation Date: 12/19/2016.
 
  41 public abstract class BaseTCADMaaPMRWorker extends AbstractWorker {
 
  43     private static final Logger LOG = LoggerFactory.getLogger(BaseTCADMaaPMRWorker.class);
 
  48     protected Scheduler scheduler;
 
  50      * Determines if scheduler is shutdown
 
  52     protected AtomicBoolean isSchedulerShutdown;
 
  58         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
 
  59         String schedulerName = "";
 
  63             schedulerName = scheduler.getSchedulerName();
 
  65             isSchedulerShutdown.getAndSet(false);
 
  67         } catch (SchedulerException e) {
 
  68             final String errorMessage =
 
  69                     format("Error while starting TCA DMaaP MR scheduler name: %s, error: %s", schedulerName, e);
 
  70             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
 
  73         LOG.info("Successfully started DMaaP MR Scheduler: {}", schedulerName);
 
  75         // indefinite loop which wakes up and confirms scheduler is indeed running
 
  76         while (!isSchedulerShutdown.get()) {
 
  79                 Thread.sleep(AnalyticsConstants.TCA_DEFAULT_WORKER_SHUTDOWN_CHECK_INTERVAL_MS);
 
  81             } catch (InterruptedException e) {
 
  83                 final String errorMessage =
 
  84                         format("Error while checking TCA DMaaP MR Scheduler worker status name: %s, error: %s",
 
  86                 throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
 
  90         LOG.info("Finished execution of TCA DMaaP MR worker thread: {}", schedulerName);
 
  97         Preconditions.checkNotNull(scheduler, "Scheduler must not be null");
 
  98         String schedulerName = "";
 
 102             schedulerName = scheduler.getSchedulerName();
 
 103             LOG.info("Shutting TCA DMaaP MR Scheduler: {}", schedulerName);
 
 104             scheduler.shutdown();
 
 105             isSchedulerShutdown.getAndSet(true);
 
 107         } catch (SchedulerException e) {
 
 109             final String errorMessage =
 
 110                     format("Error while shutting down TCA DMaaP MR Scheduler: name: %s, error: %s", schedulerName, e);
 
 111             throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);