66cce840f4e117b97f84cf1388036a33f538d417
[ccsdk/features.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20
21 import java.util.Properties;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
29
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
31
32     private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
33
34     private final String name = this.getClass().getSimpleName();
35     private Properties properties = null;
36     private MRConsumer consumer = null;
37     private MRConsumerResponse consumerResponse = null;
38     private boolean running = false;
39     private boolean ready = false;
40     private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41     private int timeout = 15000; // Default timeout - 15 seconds
42
43     protected DMaaPVESMsgConsumerImpl() {
44
45     }
46
47     /*
48      * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns. 
49      * If no data arrives on the topic, sleeps for a certain time period before checking again
50      */
51     @Override
52     public void run() {
53
54         if (ready) {
55             running = true;
56             while (running) {
57                 try {
58                     boolean noData = true;
59                     consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60                     for (String msg : consumerResponse.getActualMessages()) {
61                         noData = false;
62                         LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n" + msg);
63                         processMsg(msg);
64                     }
65
66                     if (noData) {
67                         LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68                         LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
69                         if ((consumerResponse.getResponseCode() == null)
70                                 && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
71                             LOG.warn("Client timeout while waiting for response from Server {}",
72                                     consumerResponse.getResponseMessage());
73                         }
74                         pauseThread();
75                     }
76                 } catch (Exception e) {
77                     LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
78                     running = false;
79                 }
80             }
81         }
82     }
83
84     /*
85      * Create a consumer by specifying  properties containing information such as topic name, timeout, URL etc 
86      */
87     @Override
88     public void init(Properties properties) {
89
90         try {
91
92             String timeoutStr = properties.getProperty("timeout");
93             LOG.debug("timeoutStr: " + timeoutStr);
94
95             if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
96                 timeout = parseTimeOutValue(timeoutStr);
97             }
98
99             String fetchPauseStr = properties.getProperty("fetchPause");
100             LOG.debug("fetchPause(Str): " + fetchPauseStr);
101             if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
102                 fetchPause = parseFetchPause(fetchPauseStr);
103             }
104             LOG.debug("fetchPause: " + fetchPause);
105
106             this.consumer = MRClientFactory.createConsumer(properties);
107             ready = true;
108         } catch (Exception e) {
109             LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
110         }
111     }
112
113     private int parseTimeOutValue(String timeoutStr) {
114         try {
115             return Integer.parseInt(timeoutStr);
116         } catch (NumberFormatException e) {
117             LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
118         }
119         return timeout;
120     }
121
122     private int parseFetchPause(String fetchPauseStr) {
123         try {
124             return Integer.parseInt(fetchPauseStr);
125         } catch (NumberFormatException e) {
126             LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
127         }
128         return fetchPause;
129     }
130
131     private void pauseThread() throws InterruptedException {
132         if (fetchPause > 0) {
133             LOG.debug(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
134             Thread.sleep(fetchPause);
135         } else {
136             LOG.debug("No data received from fetch.  No fetch pause specified - retrying immediately");
137         }
138     }
139
140     @Override
141     public boolean isReady() {
142         return ready;
143     }
144
145     @Override
146     public boolean isRunning() {
147         return running;
148     }
149
150     public String getProperty(String name) {
151         return properties.getProperty(name, "");
152     }
153
154     @Override
155     public void stopConsumer() {
156         running = false;
157     }
158
159     public abstract void processMsg(String msg) throws Exception;
160
161 }