2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.Properties;
22 import org.onap.dmaap.mr.client.MRClientFactory;
23 import org.onap.dmaap.mr.client.MRConsumer;
24 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
30 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
32 private final String name = this.getClass().getSimpleName();
33 private Properties properties = null;
34 private MRConsumer consumer = null;
35 private boolean running = false;
36 private boolean ready = false;
37 private int fetchPause = 5000; // Default pause between fetch - 5 seconds
38 private int timeout = 15000; // Default timeout - 15 seconds
40 protected DMaaPVESMsgConsumerImpl() {
45 * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
46 * If no data arrives on the topic, sleeps for a certain time period before checking again
55 boolean noData = true;
56 MRConsumerResponse consumerResponse = null;
57 consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
58 for (String msg : consumerResponse.getActualMessages()) {
60 LOG.debug("{} received ActualMessage from DMaaP VES Message topic {}", name,msg);
65 LOG.debug("{} received ResponseCode: {}", name, consumerResponse.getResponseCode());
66 LOG.debug("{} received ResponseMessage: {}", name, consumerResponse.getResponseMessage());
67 if ((consumerResponse.getResponseCode() == null)
68 && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
69 LOG.warn("Client timeout while waiting for response from Server {}",
70 consumerResponse.getResponseMessage());
74 } catch (Exception e) {
75 LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
83 * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
86 public void init(Properties properties) {
90 String timeoutStr = properties.getProperty("timeout");
91 LOG.debug("timeoutStr: {}", timeoutStr);
93 if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
94 timeout = parseTimeOutValue(timeoutStr);
97 String fetchPauseStr = properties.getProperty("fetchPause");
98 LOG.debug("fetchPause(Str): {}",fetchPauseStr);
99 if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
100 fetchPause = parseFetchPause(fetchPauseStr);
102 LOG.debug("fetchPause: {} ",fetchPause);
104 this.consumer = MRClientFactory.createConsumer(properties);
106 } catch (Exception e) {
107 LOG.error("Error initializing DMaaP VES Message consumer from file {} {}",properties, e);
111 private int parseTimeOutValue(String timeoutStr) {
113 return Integer.parseInt(timeoutStr);
114 } catch (NumberFormatException e) {
115 LOG.error("Non-numeric value specified for timeout ({})",timeoutStr);
120 private int parseFetchPause(String fetchPauseStr) {
122 return Integer.parseInt(fetchPauseStr);
123 } catch (NumberFormatException e) {
124 LOG.error("Non-numeric value specified for fetchPause ({})",fetchPauseStr);
129 private void pauseThread() throws InterruptedException {
130 if (fetchPause > 0) {
131 LOG.debug("No data received from fetch. Pausing {} ms before retry", fetchPause);
132 Thread.sleep(fetchPause);
134 LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
139 public boolean isReady() {
144 public boolean isRunning() {
148 public String getProperty(String name) {
149 return properties.getProperty(name, "");
153 public void stopConsumer() {
158 public abstract void processMsg(String msg) throws Exception;*/