1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
21 import java.util.Properties;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
32 private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
34 private final String name = this.getClass().getSimpleName();
35 private Properties properties = null;
36 private MRConsumer consumer = null;
37 private MRConsumerResponse consumerResponse = null;
38 private boolean running = false;
39 private boolean ready = false;
40 private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41 private int timeout = 15000; // Default timeout - 15 seconds
43 protected DMaaPVESMsgConsumerImpl() {
48 * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
49 * If no data arrives on the topic, sleeps for a certain time period before checking again
58 boolean noData = true;
59 consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60 for (String msg : consumerResponse.getActualMessages()) {
62 LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n"+msg);
67 LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68 LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
71 } catch (Exception e) {
72 LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
80 * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
83 public void init(Properties properties) {
87 String timeoutStr = properties.getProperty("timeout");
88 LOG.debug("timeoutStr: " + timeoutStr);
90 if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
91 timeout = parseTimeOutValue(timeoutStr);
94 String fetchPauseStr = properties.getProperty("fetchPause");
95 LOG.debug("fetchPause(Str): " + fetchPauseStr);
96 if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
97 fetchPause = parseFetchPause(fetchPauseStr);
99 LOG.debug("fetchPause: " + fetchPause);
101 this.consumer = MRClientFactory.createConsumer(properties);
103 } catch (Exception e) {
104 LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
108 private int parseTimeOutValue(String timeoutStr) {
110 return Integer.parseInt(timeoutStr);
111 } catch (NumberFormatException e) {
112 LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
117 private int parseFetchPause(String fetchPauseStr) {
119 return Integer.parseInt(fetchPauseStr);
120 } catch (NumberFormatException e) {
121 LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
126 private void pauseThread() throws InterruptedException {
127 if (fetchPause > 0) {
128 LOG.debug(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
129 Thread.sleep(fetchPause);
131 LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
136 public boolean isReady() {
141 public boolean isRunning() {
145 public String getProperty(String name) {
146 return properties.getProperty(name, "");
150 public void stopConsumer() {
154 public abstract void processMsg(String msg) throws Exception;