2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;
23 import co.cask.cdap.api.annotation.Property;
24 import co.cask.cdap.api.worker.AbstractWorker;
25 import co.cask.cdap.api.worker.WorkerContext;
26 import com.fasterxml.jackson.core.type.TypeReference;
27 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;
28 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;
29 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
30 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 import java.io.FileNotFoundException;
35 import java.io.IOException;
36 import java.io.InputStream;
37 import java.util.List;
39 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue;
40 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString;
43 * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic.
44 * The mock instead of making DMaaP MR calls will actually take messages
45 * from file and send them to stream at subscriber polling interval
47 * TODO: To be removed before going to production - only for testing purposes
49 * @author Rajiv Singla . Creation Date: 11/4/2016.
51 public class TCADMaaPMockSubscriberWorker extends AbstractWorker {
53 private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class);
55 // TODO: Remove this file before going to production - only for mocking purposes
56 private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
57 private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
58 new TypeReference<List<EventListener>>() {
61 private TCAAppPreferences tcaAppPreferences;
62 private boolean stopSendingMessages;
64 private final String tcaSubscriberOutputStreamName;
66 public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) {
67 this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;
71 public void configure() {
72 setName("MockTCASubscriberWorker");
73 setDescription("Writes Mocked VES messages to CDAP Stream");
74 LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker");
78 public void initialize(WorkerContext context) throws Exception {
79 super.initialize(context);
81 final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);
82 LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences);
83 this.tcaAppPreferences = appPreferences;
84 this.stopSendingMessages = false;
90 final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();
91 LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval);
93 final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream
94 (MOCK_MESSAGE_FILE_LOCATION);
96 if (resourceAsStream == null) {
97 LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
98 throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
103 List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
105 final int totalMessageCount = eventListeners.size();
106 LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
109 for (EventListener eventListener : eventListeners) {
110 if (stopSendingMessages) {
111 LOG.debug("Stop sending messages......");
114 final String eventListenerString = writeValueAsString(eventListener);
115 LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
116 getContext().write(tcaSubscriberOutputStreamName, eventListenerString);
120 Thread.sleep(subscriberPollingInterval);
121 } catch (InterruptedException e) {
122 LOG.error("Error while sleeping");
123 throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
127 LOG.debug("Finished writing mock messages to CDAP Stream");
129 } catch (IOException e) {
130 LOG.error("Error while parsing json file");
131 throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
139 stopSendingMessages = true;