2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
 
  21 import java.util.Properties;
 
  23 import org.slf4j.Logger;
 
  24 import org.slf4j.LoggerFactory;
 
  26 import org.onap.dmaap.mr.client.MRClientFactory;
 
  27 import org.onap.dmaap.mr.client.MRConsumer;
 
  28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 
  30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
 
  32     private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
 
  34     private final String name = this.getClass().getSimpleName();
 
  35     private Properties properties = null;
 
  36     private MRConsumer consumer = null;
 
  37     private MRConsumerResponse consumerResponse = null;
 
  38     private boolean running = false;
 
  39     private boolean ready = false;
 
  40     private int fetchPause = 5000; // Default pause between fetch - 5 seconds
 
  41     private int timeout = 15000; // Default timeout - 15 seconds
 
  43     protected DMaaPVESMsgConsumerImpl() {
 
  48      * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns. 
 
  49      * If no data arrives on the topic, sleeps for a certain time period before checking again
 
  58                     boolean noData = true;
 
  59                     consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
 
  60                     for (String msg : consumerResponse.getActualMessages()) {
 
  62                         LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n" + msg);
 
  67                         LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
 
  68                         LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
 
  69                         if ((consumerResponse.getResponseCode() == null)
 
  70                                 && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
 
  71                             LOG.warn("Client timeout while waiting for response from Server {}",
 
  72                                     consumerResponse.getResponseMessage());
 
  76                 } catch (Exception e) {
 
  77                     LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
 
  85      * Create a consumer by specifying  properties containing information such as topic name, timeout, URL etc 
 
  88     public void init(Properties properties) {
 
  92             String timeoutStr = properties.getProperty("timeout");
 
  93             LOG.debug("timeoutStr: " + timeoutStr);
 
  95             if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
 
  96                 timeout = parseTimeOutValue(timeoutStr);
 
  99             String fetchPauseStr = properties.getProperty("fetchPause");
 
 100             LOG.debug("fetchPause(Str): " + fetchPauseStr);
 
 101             if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
 
 102                 fetchPause = parseFetchPause(fetchPauseStr);
 
 104             LOG.debug("fetchPause: " + fetchPause);
 
 106             this.consumer = MRClientFactory.createConsumer(properties);
 
 108         } catch (Exception e) {
 
 109             LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
 
 113     private int parseTimeOutValue(String timeoutStr) {
 
 115             return Integer.parseInt(timeoutStr);
 
 116         } catch (NumberFormatException e) {
 
 117             LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
 
 122     private int parseFetchPause(String fetchPauseStr) {
 
 124             return Integer.parseInt(fetchPauseStr);
 
 125         } catch (NumberFormatException e) {
 
 126             LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
 
 131     private void pauseThread() throws InterruptedException {
 
 132         if (fetchPause > 0) {
 
 133             LOG.debug(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
 
 134             Thread.sleep(fetchPause);
 
 136             LOG.debug("No data received from fetch.  No fetch pause specified - retrying immediately");
 
 141     public boolean isReady() {
 
 146     public boolean isRunning() {
 
 150     public String getProperty(String name) {
 
 151         return properties.getProperty(name, "");
 
 155     public void stopConsumer() {
 
 159     public abstract void processMsg(String msg) throws Exception;