1164a9a884d69264d4840aadc73ca33b7c6a1331
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20
21 import java.util.Properties;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
29
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
31
32         private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
33
34         private final String name = this.getClass().getSimpleName();
35         private Properties properties = null;
36         private MRConsumer consumer = null;
37         private MRConsumerResponse consumerResponse = null;
38         private boolean running = false;
39         private boolean ready = false;
40         private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41         private int timeout = 15000; // Default timeout - 15 seconds
42
43         protected DMaaPVESMsgConsumerImpl() {
44
45         }
46
47         /*
48          * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns. 
49          * If no data arrives on the topic, sleeps for a certain time period before checking again
50          */
51         @Override
52         public void run() {
53
54                 if (ready) {
55                         running = true;
56                         while (running) {
57                                 try {
58                                         boolean noData = true;
59                                         consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60                                         for (String msg : consumerResponse.getActualMessages()) {
61                                                 noData = false;
62                                                 LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n"+msg);
63                                                 processMsg(msg);
64                                         }
65
66                                         if (noData) {
67                                                 LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68                                                 LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
69                                                 if ((consumerResponse.getResponseCode() == null) && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
70                                                         LOG.warn("Client timeout while waiting for response from Server {}", consumerResponse.getResponseMessage());
71                                                 }
72                                                 pauseThread();
73                                         }
74                                 } catch (Exception e) {
75                                         LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
76                                         running = false;
77                                 }
78                         }
79                 }
80         }
81
82         /*
83          * Create a consumer by specifying  properties containing information such as topic name, timeout, URL etc 
84          */
85         @Override
86         public void init(Properties properties) {
87
88                 try {
89                         
90                         String timeoutStr = properties.getProperty("timeout");
91                         LOG.debug("timeoutStr: " + timeoutStr);
92
93                         if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
94                                 timeout = parseTimeOutValue(timeoutStr);
95                         }
96
97                         String fetchPauseStr = properties.getProperty("fetchPause");
98                         LOG.debug("fetchPause(Str): " + fetchPauseStr);
99                         if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
100                                 fetchPause = parseFetchPause(fetchPauseStr);
101                         }
102                         LOG.debug("fetchPause: " + fetchPause);
103
104                         this.consumer = MRClientFactory.createConsumer(properties);
105                         ready = true;
106                 } catch (Exception e) {
107                         LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
108                 }
109         }
110
111         private int parseTimeOutValue(String timeoutStr) {
112                 try {
113                         return Integer.parseInt(timeoutStr);
114                 } catch (NumberFormatException e) {
115                         LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
116                 }
117                 return timeout;
118         }
119
120         private int parseFetchPause(String fetchPauseStr) {
121                 try {
122                         return Integer.parseInt(fetchPauseStr);
123                 } catch (NumberFormatException e) {
124                         LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
125                 }
126                 return fetchPause;
127         }
128
129         private void pauseThread() throws InterruptedException {
130                 if (fetchPause > 0) {
131                         LOG.debug(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
132                         Thread.sleep(fetchPause);
133                 } else {
134                         LOG.debug("No data received from fetch.  No fetch pause specified - retrying immediately");
135                 }
136         }
137
138         @Override
139         public boolean isReady() {
140                 return ready;
141         }
142
143         @Override
144         public boolean isRunning() {
145                 return running;
146         }
147
148         public String getProperty(String name) {
149                 return properties.getProperty(name, "");
150         }
151
152         @Override
153         public void stopConsumer() {
154                 running = false;
155         }
156
157         public abstract void processMsg(String msg) throws Exception;
158
159 }