Sonar Critical Fix
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / streaming / dmaap / MockDMaaPMRReceiver.java
1 /*\r
2  * ===============================LICENSE_START======================================\r
3  *  dcae-analytics\r
4  * ================================================================================\r
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  *  Licensed under the Apache License, Version 2.0 (the "License");\r
8  *  you may not use this file except in compliance with the License.\r
9  *   You may obtain a copy of the License at\r
10  *\r
11  *          http://www.apache.org/licenses/LICENSE-2.0\r
12  *\r
13  *  Unless required by applicable law or agreed to in writing, software\r
14  *  distributed under the License is distributed on an "AS IS" BASIS,\r
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  *  See the License for the specific language governing permissions and\r
17  *  limitations under the License.\r
18  *  ============================LICENSE_END===========================================\r
19  */\r
20 \r
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;\r
22 \r
23 import co.cask.cdap.api.data.format.StructuredRecord;\r
24 import com.fasterxml.jackson.core.type.TypeReference;\r
25 import org.apache.spark.storage.StorageLevel;\r
26 import org.apache.spark.streaming.receiver.Receiver;\r
27 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;\r
28 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;\r
29 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;\r
30 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
31 import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;\r
32 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;\r
33 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
34 import org.slf4j.Logger;\r
35 import org.slf4j.LoggerFactory;\r
36 \r
37 import java.io.FileNotFoundException;\r
38 import java.io.IOException;\r
39 import java.io.InputStream;\r
40 import java.util.List;\r
41 import java.util.concurrent.TimeUnit;\r
42 \r
43 import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue;\r
44 import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString;\r
45 \r
46 /**\r
47  * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records\r
48  * <p>\r
49  * @author Rajiv Singla . Creation Date: 1/19/2017.\r
50  */\r
51 public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> {\r
52 \r
53     private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class);\r
54     private static final long serialVersionUID = 1L;\r
55 \r
56     private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";\r
57     private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =\r
58             new TypeReference<List<EventListener>>() {\r
59             };\r
60 \r
61     private final DMaaPMRSourcePluginConfig pluginConfig;\r
62 \r
63     public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) {\r
64         super(storageLevel);\r
65         this.pluginConfig = pluginConfig;\r
66         LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);\r
67     }\r
68 \r
69     @Override\r
70     public void onStart() {\r
71 \r
72         // create DMaaP MR Subscriber\r
73         final DMaaPMRSubscriber subscriber =\r
74                 DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));\r
75         storeStructuredRecords(subscriber);\r
76 \r
77     }\r
78 \r
79     @Override\r
80     public void onStop() {\r
81         LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);\r
82     }\r
83 \r
84     /**\r
85      * Fetches records from DMaaP MR Subscriber and store them as structured records\r
86      *\r
87      * @param subscriber DMaaP MR Subscriber Instance\r
88      */\r
89     public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {\r
90 \r
91         LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");\r
92         \r
93         try (InputStream resourceAsStream =\r
94                      Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) {\r
95 \r
96             if (resourceAsStream == null) {\r
97                 LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);\r
98                 throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());\r
99             }\r
100 \r
101             List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);\r
102 \r
103             final int totalMessageCount = eventListeners.size();\r
104             LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);\r
105 \r
106             int i = 1;\r
107             for (EventListener eventListener : eventListeners) {\r
108                 if (isStopped()) {\r
109                     return;\r
110                 }\r
111                 final String eventListenerString = writeValueAsString(eventListener);\r
112                 LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);\r
113                 store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString));\r
114                 i++;\r
115                 try {\r
116                     TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval());\r
117                 } catch (InterruptedException e) {\r
118                     LOG.error("Error while sleeping");\r
119                     throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);\r
120                 }\r
121 \r
122             }\r
123 \r
124             LOG.debug("Finished writing mock messages to CDAP Stream");\r
125 \r
126         } catch (IOException e) {\r
127             LOG.error("Error while parsing json file");\r
128             throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);\r
129         }\r
130     }\r
131 \r
132 }\r