Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-tca / src / main / java / org / openecomp / dcae / apod / analytics / cdap / tca / worker / TCADMaaPMockSubscriberWorker.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.tca.worker;\r
22 \r
23 import co.cask.cdap.api.annotation.Property;\r
24 import co.cask.cdap.api.worker.AbstractWorker;\r
25 import co.cask.cdap.api.worker.WorkerContext;\r
26 import com.fasterxml.jackson.core.type.TypeReference;\r
27 import org.openecomp.dcae.apod.analytics.cdap.tca.settings.TCAAppPreferences;\r
28 import org.openecomp.dcae.apod.analytics.cdap.tca.utils.CDAPTCAUtils;\r
29 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
30 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
31 import org.slf4j.Logger;\r
32 import org.slf4j.LoggerFactory;\r
33 \r
34 import java.io.FileNotFoundException;\r
35 import java.io.IOException;\r
36 import java.io.InputStream;\r
37 import java.util.List;\r
38 \r
39 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.readValue;\r
40 import static org.openecomp.dcae.apod.analytics.tca.utils.TCAUtils.writeValueAsString;\r
41 \r
42 /**\r
43  * CDAP Worker which mocks fetching VES Messages from DMaaP MR topic.\r
44  * The mock instead of making DMaaP MR calls will actually take messages\r
45  * from file and send them to stream at subscriber polling interval\r
46  *\r
47  * TODO: To be removed before going to production - only for testing purposes\r
48  *\r
49  * @author Rajiv Singla . Creation Date: 11/4/2016.\r
50  */\r
51 public class TCADMaaPMockSubscriberWorker extends AbstractWorker {\r
52 \r
53     private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMockSubscriberWorker.class);\r
54 \r
55     // TODO: Remove this file before going to production - only for mocking purposes\r
56     private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";\r
57     private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =\r
58             new TypeReference<List<EventListener>>() {\r
59             };\r
60 \r
61     private TCAAppPreferences tcaAppPreferences;\r
62     private boolean stopSendingMessages;\r
63     @Property\r
64     private final String tcaSubscriberOutputStreamName;\r
65 \r
66     public TCADMaaPMockSubscriberWorker(final String tcaSubscriberOutputStreamName) {\r
67         this.tcaSubscriberOutputStreamName = tcaSubscriberOutputStreamName;\r
68     }\r
69 \r
70     @Override\r
71     public void configure() {\r
72         setName("MockTCASubscriberWorker");\r
73         setDescription("Writes Mocked VES messages to CDAP Stream");\r
74         LOG.info("Configuring Mock TCA MR DMaaP Subscriber worker with name: {}", "MockTCASubscriberWorker");\r
75     }\r
76 \r
77     @Override\r
78     public void initialize(WorkerContext context) throws Exception {\r
79         super.initialize(context);\r
80 \r
81         final TCAAppPreferences appPreferences = CDAPTCAUtils.getValidatedTCAAppPreferences(context);\r
82         LOG.info("Initializing Mock TCA MR DMaaP Subscriber worker with preferences: {}", appPreferences);\r
83         this.tcaAppPreferences = appPreferences;\r
84         this.stopSendingMessages = false;\r
85     }\r
86 \r
87 \r
88     @Override\r
89     public void run() {\r
90         final Integer subscriberPollingInterval = tcaAppPreferences.getSubscriberPollingInterval();\r
91         LOG.debug("Mock TCA Subscriber Polling interval: {}", subscriberPollingInterval);\r
92 \r
93         final InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream\r
94                 (MOCK_MESSAGE_FILE_LOCATION);\r
95 \r
96         if (resourceAsStream == null) {\r
97             LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);\r
98             throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());\r
99         }\r
100 \r
101 \r
102         try {\r
103             List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);\r
104 \r
105             final int totalMessageCount = eventListeners.size();\r
106             LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);\r
107 \r
108             int i = 1;\r
109             for (EventListener eventListener : eventListeners) {\r
110                 if (stopSendingMessages) {\r
111                     LOG.debug("Stop sending messages......");\r
112                     break;\r
113                 }\r
114                 final String eventListenerString = writeValueAsString(eventListener);\r
115                 LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);\r
116                 getContext().write(tcaSubscriberOutputStreamName, eventListenerString);\r
117                 i++;\r
118 \r
119                 try {\r
120                     Thread.sleep(subscriberPollingInterval);\r
121                 } catch (InterruptedException e) {\r
122                     LOG.error("Error while sleeping");\r
123                     throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);\r
124                 }\r
125             }\r
126 \r
127             LOG.debug("Finished writing mock messages to CDAP Stream");\r
128 \r
129         } catch (IOException e) {\r
130             LOG.error("Error while parsing json file");\r
131             throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);\r
132         }\r
133 \r
134 \r
135     }\r
136 \r
137     @Override\r
138     public void stop() {\r
139         stopSendingMessages = true;\r
140     }\r
141 }\r