1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.Properties;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
32 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
34 private final String name = this.getClass().getSimpleName();
35 private Properties properties = null;
36 private MRConsumer consumer = null;
37 private MRConsumerResponse consumerResponse = null;
38 private boolean running = false;
39 private boolean ready = false;
40 private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41 private int timeout = 15000; // Default timeout - 15 seconds
43 protected DMaaPVESMsgConsumerImpl() {
48 * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
49 * If no data arrives on the topic, sleeps for a certain time period before checking again
58 boolean noData = true;
59 consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60 for (String msg : consumerResponse.getActualMessages()) {
62 LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n"+msg);
67 LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68 LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
69 if ((consumerResponse.getResponseCode() == null) && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
70 LOG.warn("Client timeout while waiting for response from Server {}", consumerResponse.getResponseMessage());
74 } catch (Exception e) {
75 LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
83 * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
86 public void init(Properties properties) {
90 String timeoutStr = properties.getProperty("timeout");
91 LOG.debug("timeoutStr: " + timeoutStr);
93 if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
94 timeout = parseTimeOutValue(timeoutStr);
97 String fetchPauseStr = properties.getProperty("fetchPause");
98 LOG.debug("fetchPause(Str): " + fetchPauseStr);
99 if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
100 fetchPause = parseFetchPause(fetchPauseStr);
102 LOG.debug("fetchPause: " + fetchPause);
104 this.consumer = MRClientFactory.createConsumer(properties);
106 } catch (Exception e) {
107 LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
111 private int parseTimeOutValue(String timeoutStr) {
113 return Integer.parseInt(timeoutStr);
114 } catch (NumberFormatException e) {
115 LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
120 private int parseFetchPause(String fetchPauseStr) {
122 return Integer.parseInt(fetchPauseStr);
123 } catch (NumberFormatException e) {
124 LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
129 private void pauseThread() throws InterruptedException {
130 if (fetchPause > 0) {
131 LOG.debug(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
132 Thread.sleep(fetchPause);
134 LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
139 public boolean isReady() {
144 public boolean isRunning() {
148 public String getProperty(String name) {
149 return properties.getProperty(name, "");
153 public void stopConsumer() {
157 public abstract void processMsg(String msg) throws Exception;