-/*\r
- * ===============================LICENSE_START======================================\r
- * dcae-analytics\r
- * ================================================================================\r
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * ================================================================================\r
- * Licensed under the Apache License, Version 2.0 (the "License");\r
- * you may not use this file except in compliance with the License.\r
- * You may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * Unless required by applicable law or agreed to in writing, software\r
- * distributed under the License is distributed on an "AS IS" BASIS,\r
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ============================LICENSE_END===========================================\r
- */\r
-\r
-package org.openecomp.dcae.apod.analytics.cdap.plugins.streaming.dmaap;\r
-\r
-import co.cask.cdap.api.data.format.StructuredRecord;\r
-import com.fasterxml.jackson.core.type.TypeReference;\r
-import org.apache.spark.storage.StorageLevel;\r
-import org.apache.spark.streaming.receiver.Receiver;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;\r
-import org.openecomp.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;\r
-import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;\r
-import org.openecomp.dcae.apod.analytics.dmaap.DMaaPMRFactory;\r
-import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;\r
-import org.openecomp.dcae.apod.analytics.model.domain.cef.EventListener;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-import java.io.FileNotFoundException;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.util.List;\r
-import java.util.concurrent.TimeUnit;\r
-\r
-import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue;\r
-import static org.openecomp.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString;\r
-\r
-/**\r
- * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records\r
- * <p>\r
- * @author Rajiv Singla . Creation Date: 1/19/2017.\r
- */\r
-public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> {\r
-\r
- private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class);\r
- private static final long serialVersionUID = 1L;\r
-\r
- private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";\r
- private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =\r
- new TypeReference<List<EventListener>>() {\r
- };\r
-\r
- private final DMaaPMRSourcePluginConfig pluginConfig;\r
-\r
- public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) {\r
- super(storageLevel);\r
- this.pluginConfig = pluginConfig;\r
- LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);\r
- }\r
-\r
- @Override\r
- public void onStart() {\r
-\r
- // create DMaaP MR Subscriber\r
- final DMaaPMRSubscriber subscriber =\r
- DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));\r
- storeStructuredRecords(subscriber);\r
-\r
- }\r
-\r
- @Override\r
- public void onStop() {\r
- LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);\r
- }\r
-\r
- /**\r
- * Fetches records from DMaaP MR Subscriber and store them as structured records\r
- *\r
- * @param subscriber DMaaP MR Subscriber Instance\r
- */\r
- public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {\r
-\r
- LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");\r
- \r
- try (InputStream resourceAsStream =\r
- Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) {\r
-\r
- if (resourceAsStream == null) {\r
- LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);\r
- throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());\r
- }\r
-\r
- List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);\r
-\r
- final int totalMessageCount = eventListeners.size();\r
- LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);\r
-\r
- int i = 1;\r
- for (EventListener eventListener : eventListeners) {\r
- if (isStopped()) {\r
- return;\r
- }\r
- final String eventListenerString = writeValueAsString(eventListener);\r
- LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);\r
- store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString));\r
- i++;\r
- try {\r
- TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval());\r
- } catch (InterruptedException e) {\r
- LOG.error("Error while sleeping");\r
- throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);\r
- }\r
-\r
- }\r
-\r
- LOG.debug("Finished writing mock messages to CDAP Stream");\r
-\r
- } catch (IOException e) {\r
- LOG.error("Error while parsing json file");\r
- throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);\r
- }\r
- }\r
-\r
-}\r
+/*
+ * ===============================LICENSE_START======================================
+ * dcae-analytics
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============================LICENSE_END===========================================
+ */
+
+package org.onap.dcae.apod.analytics.cdap.plugins.streaming.dmaap;
+
+import co.cask.cdap.api.data.format.StructuredRecord;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.receiver.Receiver;
+import org.onap.dcae.apod.analytics.cdap.plugins.domain.config.dmaap.DMaaPMRSourcePluginConfig;
+import org.onap.dcae.apod.analytics.cdap.plugins.utils.CDAPPluginUtils;
+import org.onap.dcae.apod.analytics.cdap.plugins.utils.DMaaPSourceConfigMapper;
+import org.onap.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
+import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory;
+import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
+import org.onap.dcae.apod.analytics.model.domain.cef.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.readValue;
+import static org.onap.dcae.apod.analytics.model.util.AnalyticsModelJsonUtils.writeValueAsString;
+
+/**
+ * DMaaP MR Receiver which calls DMaaP MR Topic and stores structured records
+ * <p>
+ * @author Rajiv Singla . Creation Date: 1/19/2017.
+ */
+public class MockDMaaPMRReceiver extends Receiver<StructuredRecord> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MockDMaaPMRReceiver.class);
+ private static final long serialVersionUID = 1L;
+
+ private static final String MOCK_MESSAGE_FILE_LOCATION = "ves_mock_messages.json";
+ private static final TypeReference<List<EventListener>> EVENT_LISTENER_TYPE_REFERENCE =
+ new TypeReference<List<EventListener>>() {
+ };
+
+ private final DMaaPMRSourcePluginConfig pluginConfig;
+
+ public MockDMaaPMRReceiver(final StorageLevel storageLevel, final DMaaPMRSourcePluginConfig pluginConfig) {
+ super(storageLevel);
+ this.pluginConfig = pluginConfig;
+ LOG.debug("Created DMaaP MR Receiver instance with plugin Config: {}", pluginConfig);
+ }
+
+ @Override
+ public void onStart() {
+
+ // create DMaaP MR Subscriber
+ final DMaaPMRSubscriber subscriber =
+ DMaaPMRFactory.create().createSubscriber(DMaaPSourceConfigMapper.map(pluginConfig));
+ storeStructuredRecords(subscriber);
+
+ }
+
+ @Override
+ public void onStop() {
+ LOG.debug("Stopping DMaaP MR Receiver with plugin config: {}", pluginConfig);
+ }
+
+ /**
+ * Fetches records from DMaaP MR Subscriber and store them as structured records
+ *
+ * @param subscriber DMaaP MR Subscriber Instance
+ */
+ public void storeStructuredRecords(final DMaaPMRSubscriber subscriber) {
+
+ LOG.debug("DMaaP MR Receiver start fetching messages from DMaaP MR Topic");
+
+ try (InputStream resourceAsStream =
+ Thread.currentThread().getContextClassLoader().getResourceAsStream(MOCK_MESSAGE_FILE_LOCATION)) {
+
+ if (resourceAsStream == null) {
+ LOG.error("Unable to find file at location: {}", MOCK_MESSAGE_FILE_LOCATION);
+ throw new DCAEAnalyticsRuntimeException("Unable to find file", LOG, new FileNotFoundException());
+ }
+
+ List<EventListener> eventListeners = readValue(resourceAsStream, EVENT_LISTENER_TYPE_REFERENCE);
+
+ final int totalMessageCount = eventListeners.size();
+ LOG.debug("Mock message count to be written to cdap stream: ()", totalMessageCount);
+
+ int i = 1;
+ for (EventListener eventListener : eventListeners) {
+ if (isStopped()) {
+ return;
+ }
+ final String eventListenerString = writeValueAsString(eventListener);
+ LOG.debug("=======>> Writing message to cdap stream no: {} of {}", i, totalMessageCount);
+ store(CDAPPluginUtils.createDMaaPMRResponseStructuredRecord(eventListenerString));
+ i++;
+ try {
+ TimeUnit.MILLISECONDS.sleep(pluginConfig.getPollingInterval());
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping");
+ throw new DCAEAnalyticsRuntimeException("Error while sleeping", LOG, e);
+ }
+
+ }
+
+ LOG.debug("Finished writing mock messages to CDAP Stream");
+
+ } catch (IOException e) {
+ LOG.error("Error while parsing json file");
+ throw new DCAEAnalyticsRuntimeException("Error while parsing mock json file", LOG, e);
+ }
+ }
+
+}