bf8596cbcaf261b4714fd8b055cac3bf9eca2a23
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt mountpoint-registrar
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18
19 package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
20
21 import java.util.Properties;
22
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import org.onap.dmaap.mr.client.MRClientFactory;
27 import org.onap.dmaap.mr.client.MRConsumer;
28 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
29
30 public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
31
32         private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
33
34         private final String name = this.getClass().getSimpleName();
35         private Properties properties = null;
36         private MRConsumer consumer = null;
37         private MRConsumerResponse consumerResponse = null;
38         private boolean running = false;
39         private boolean ready = false;
40         private int fetchPause = 5000; // Default pause between fetch - 5 seconds
41         private int timeout = 15000; // Default timeout - 15 seconds
42
43         protected DMaaPVESMsgConsumerImpl() {
44
45         }
46
47         /*
48          * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns. 
49          * If no data arrives on the topic, sleeps for a certain time period before checking again
50          */
51         @Override
52         public void run() {
53
54                 if (ready) {
55                         running = true;
56                         while (running) {
57                                 try {
58                                         boolean noData = true;
59                                         consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
60                                         for (String msg : consumerResponse.getActualMessages()) {
61                                                 noData = false;
62                                                 LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n"+msg);
63                                                 processMsg(msg);
64                                         }
65
66                                         if (noData) {
67                                                 LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
68                                                 LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
69                                                 pauseThread();
70                                         }
71                                 } catch (Exception e) {
72                                         LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
73                                         running = false;
74                                 }
75                         }
76                 }
77         }
78
79         /*
80          * Create a consumer by specifying  properties containing information such as topic name, timeout, URL etc 
81          */
82         @Override
83         public void init(Properties properties) {
84
85                 try {
86                         
87                         String timeoutStr = properties.getProperty("timeout");
88                         LOG.debug("timeoutStr: " + timeoutStr);
89
90                         if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
91                                 timeout = parseTimeOutValue(timeoutStr);
92                         }
93
94                         String fetchPauseStr = properties.getProperty("fetchPause");
95                         LOG.debug("fetchPause(Str): " + fetchPauseStr);
96                         if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
97                                 fetchPause = parseFetchPause(fetchPauseStr);
98                         }
99                         LOG.debug("fetchPause: " + fetchPause);
100
101                         this.consumer = MRClientFactory.createConsumer(properties);
102                         ready = true;
103                 } catch (Exception e) {
104                         LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
105                 }
106         }
107
108         private int parseTimeOutValue(String timeoutStr) {
109                 try {
110                         return Integer.parseInt(timeoutStr);
111                 } catch (NumberFormatException e) {
112                         LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
113                 }
114                 return timeout;
115         }
116
117         private int parseFetchPause(String fetchPauseStr) {
118                 try {
119                         return Integer.parseInt(fetchPauseStr);
120                 } catch (NumberFormatException e) {
121                         LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
122                 }
123                 return fetchPause;
124         }
125
126         private void pauseThread() throws InterruptedException {
127                 if (fetchPause > 0) {
128                         LOG.debug(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
129                         Thread.sleep(fetchPause);
130                 } else {
131                         LOG.debug("No data received from fetch.  No fetch pause specified - retrying immediately");
132                 }
133         }
134
135         @Override
136         public boolean isReady() {
137                 return ready;
138         }
139
140         @Override
141         public boolean isRunning() {
142                 return running;
143         }
144
145         public String getProperty(String name) {
146                 return properties.getProperty(name, "");
147         }
148
149         @Override
150         public void stopConsumer() {
151                 running = false;
152         }
153
154         public abstract void processMsg(String msg) throws Exception;
155
156 }