2  * ===============================LICENSE_START======================================
\r 
   4  * ================================================================================
\r 
   5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r 
   6  * ================================================================================
\r 
   7  *  Licensed under the Apache License, Version 2.0 (the "License");
\r 
   8  *  you may not use this file except in compliance with the License.
\r 
   9  *   You may obtain a copy of the License at
\r 
  11  *          http://www.apache.org/licenses/LICENSE-2.0
\r 
  13  *  Unless required by applicable law or agreed to in writing, software
\r 
  14  *  distributed under the License is distributed on an "AS IS" BASIS,
\r 
  15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r 
  16  *  See the License for the specific language governing permissions and
\r 
  17  *  limitations under the License.
\r 
  18  *  ============================LICENSE_END===========================================
\r 
  21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
\r 
  23 import co.cask.cdap.api.annotation.Property;
\r 
  24 import co.cask.cdap.api.worker.AbstractWorker;
\r 
  25 import co.cask.cdap.api.worker.WorkerContext;
\r 
  26 import com.fasterxml.jackson.core.type.TypeReference;
\r 
  27 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
\r 
  28 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
\r 
  29 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
\r 
  30 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
\r 
  31 import org.slf4j.Logger;
\r 
  32 import org.slf4j.LoggerFactory;
\r 
  34 import java.io.FileNotFoundException;
\r 
  35 import java.io.IOException;
\r 
  36 import java.io.InputStream;
\r 
  37 import java.util.List;
\r 
  39 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue;
\r 
  40 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString;
\r 
  43  * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic.
\r 
  44  * The mock instead of making DMaaP MR calls will actually take messages
\r 
  45  * from file and send them to stream at subscriber polling interval
\r 
  47  * TODO: To be removed before going to production - only for testing purposes
\r 
  49  * @author Rajiv Singla . Creation Date: 11/4/2016.
\r 
  51 public class TCADMaaPMockSubscriberWorker extends AbstractWorker {
\r 
  53     private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class);
\r 
  55     // TODO: Remove this file before going to production - only for mocking purposes
\r 
  56     private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
\r 
  57     private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
\r 
  58             new TypeReference<List<EventListener>>() {
\r 
  61     private TCAAppPreferences tcaAppPreferences;
\r 
  62     private boolean stopSendingMessages;
\r 
  64     private final String tcaSubscriberOutputStreamName;
\r 
  66     public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) {
\r 
  67         this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
\r 
  71     public void configure() {
\r 
  72         setName("MockTCASubscriberWorker");
\r 
  73         setDescription("Writes Mocked VES messages to CDAP Stream");
\r 
  74         LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker");
\r 
  78     public void initialize(WorkerContext context) throws Exception {
\r 
  79         super.initialize(context);
\r 
  81         final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
\r 
  82         LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences);
\r 
  83         this.tcaAppPreferences = appPreferences;
\r 
  84         this.stopSendingMessages = false;
\r 
  90         final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
\r 
  91         LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval);
\r 
  93         final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream
\r 
  94                 (MOCK_MESSAGE_FILE_LOCATION);
\r 
  96         if (resourceAsStream == null) {
\r 
  97             LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
\r 
  98             throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
\r 
 103             List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
\r 
 105             final int totalMessageCount = eventListeners.size();
\r 
 106             LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
\r 
 109             for (EventListener eventListener : eventListeners) {
\r 
 110                 if (stopSendingMessages) {
\r 
 111                     LOG.debug("Stop sending messages......");
\r 
 114                 final String eventListenerString = writeValueAsString(eventListener);
\r 
 115                 LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
\r 
 116                 getContext().write(tcaSubscriberOutputStreamName, eventListenerString);
\r 
 120                     Thread.sleep(subscriberPollingInterval);
\r 
 121                 } catch (InterruptedException e) {
\r 
 122                     LOG.error("Error while sleeping");
\r 
 123                     throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
\r 
 127             LOG.debug("Finished writing mock messages to CDAP Stream");
\r 
 129         } catch (IOException e) {
\r 
 130             LOG.error("Error while parsing json file");
\r 
 131             throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
\r 
 138     public void stop() {
\r 
 139         stopSendingMessages = true;
\r