2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.Properties;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
32 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
34 private final String name = this.getClass().getSimpleName();
35 private Properties properties = null;
36 private MRConsumer consumer = null;
37 private MRConsumerResponse consumerResponse = null;
38 private boolean running = false;
39 private boolean ready = false;
40 private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41 private int timeout = 15000; // Default timeout - 15 seconds
43 protected DMaaPVESMsgConsumerImpl() {
48 * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
49 * If no data arrives on the topic, sleeps for a certain time period before checking again
58 boolean noData = true;
59 consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60 for (String msg : consumerResponse.getActualMessages()) {
62 LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n" + msg);
67 LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68 LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
69 if ((consumerResponse.getResponseCode() == null)
70 && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
71 LOG.warn("Client timeout while waiting for response from Server {}",
72 consumerResponse.getResponseMessage());
76 } catch (Exception e) {
77 LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
85 * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
88 public void init(Properties properties) {
92 String timeoutStr = properties.getProperty("timeout");
93 LOG.debug("timeoutStr: " + timeoutStr);
95 if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
96 timeout = parseTimeOutValue(timeoutStr);
99 String fetchPauseStr = properties.getProperty("fetchPause");
100 LOG.debug("fetchPause(Str): " + fetchPauseStr);
101 if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
102 fetchPause = parseFetchPause(fetchPauseStr);
104 LOG.debug("fetchPause: " + fetchPause);
106 this.consumer = MRClientFactory.createConsumer(properties);
108 } catch (Exception e) {
109 LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
113 private int parseTimeOutValue(String timeoutStr) {
115 return Integer.parseInt(timeoutStr);
116 } catch (NumberFormatException e) {
117 LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
122 private int parseFetchPause(String fetchPauseStr) {
124 return Integer.parseInt(fetchPauseStr);
125 } catch (NumberFormatException e) {
126 LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
131 private void pauseThread() throws InterruptedException {
132 if (fetchPause > 0) {
133 LOG.debug(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
134 Thread.sleep(fetchPause);
136 LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
141 public boolean isReady() {
146 public boolean isRunning() {
150 public String getProperty(String name) {
151 return properties.getProperty(name, "");
155 public void stopConsumer() {
159 public abstract void processMsg(String msg) throws Exception;