Initial TCA commit into DCAEGEN2
[dcaegen2/analytics/tca.git] / dcae-analytics-cdap-plugins / src / main / java / org / openecomp / dcae / apod / analytics / cdap / plugins / streaming / dmaap / MockDMaaPMRReceiver.java
1 /*
2  * ===============================LICENSE_START======================================
3  *  dcae-analytics
4  * ================================================================================
5  *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *   You may obtain a copy of the License at
10  *
11  *          http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============================LICENSE_END===========================================
19  */
20
21 package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
22
23 import co.cask.cdap.api.data.format.StructuredRecord;
24 import com.fasterxml.jackson.core.type.TypeReference;
25 import org.apache.spark.storage.StorageLevel;
26 import org.apache.spark.streaming.receiver.Receiver;
27 import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
28 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
29 import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;
30 import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
31 import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;
32 import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
33 import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import java.io.FileNotFoundException;
38 import java.io.IOException;
39 import java.io.InputStream;
40 import java.util.List;
41 import java.util.concurrent.TimeUnit;
42
43 import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue;
44 import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString;
45
46 /**
47  * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records
48  * <p>
49  * @author Rajiv Singla . Creation Date: 1/19/2017.
50  */
51 public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> {
52
53     private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class);
54     private static final long serialVersionUID = 1L;
55
56     private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
57     private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
58             new TypeReference<List<EventListener>>() {
59             };
60
61     private final DMaaPMRSourcePluginConfig pluginConfig;
62
63     public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) {
64         super(storageLevel);
65         this.pluginConfig = pluginConfig;
66         LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);
67     }
68
69     @Override
70     public void onStart() {
71
72         // create DMaaP MR Subscriber
73         final DMaaPMRSubscriber subscriber =
74                 DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
75         storeStructuredRecords(subscriber);
76
77     }
78
79     @Override
80     public void onStop() {
81         LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);
82     }
83
84     /**
85      * Fetches records from DMaaP MR Subscriber and store them as structured records
86      *
87      * @param subscriber DMaaP MR Subscriber Instance
88      */
89     public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {
90
91         LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");
92         
93         try (InputStream resourceAsStream =
94                      Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) {
95
96             if (resourceAsStream == null) {
97                 LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
98                 throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
99             }
100
101             List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
102
103             final int totalMessageCount = eventListeners.size();
104             LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
105
106             int i = 1;
107             for (EventListener eventListener : eventListeners) {
108                 if (isStopped()) {
109                     return;
110                 }
111                 final String eventListenerString = writeValueAsString(eventListener);
112                 LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
113                 store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString));
114                 i++;
115                 try {
116                     TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval());
117                 } catch (InterruptedException e) {
118                     LOG.error("Error while sleeping");
119                     throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
120                 }
121
122             }
123
124             LOG.debug("Finished writing mock messages to CDAP Stream");
125
126         } catch (IOException e) {
127             LOG.error("Error while parsing json file");
128             throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
129         }
130     }
131
132 }