2b416e7dbc7d1196dcdca5cc7ac1d5a939bccbcb
[ccsdk/sli/northbound.git] / SdncDmaapConsumer.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.northbound.dmaapclient;
23
24 import com.att.nsa.mr.client.MRClientFactory;
25 import com.att.nsa.mr.client.MRConsumer;
26 import com.att.nsa.mr.client.response.MRConsumerResponse;
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.util.Properties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public abstract class SdncDmaapConsumer implements Runnable {
34
35     private static final Logger LOG = LoggerFactory
36         .getLogger(SdncDmaapConsumer.class);
37
38     private Properties properties = null;
39     private MRConsumer consumer = null;
40     private MRConsumerResponse consumerResponse = null;
41     private boolean running = false;
42     private boolean ready = false;
43     private int fetchPause = 5000; // Default pause between fetch - 5 seconds
44     private int timeout = 15000; // Default timeout - 15 seconds
45
46     public SdncDmaapConsumer() {
47
48     }
49
50     public SdncDmaapConsumer(Properties properties, String propertiesPath) {
51         init(properties, propertiesPath);
52     }
53
54     public boolean isReady() {
55         return ready;
56     }
57
58     public boolean isRunning() {
59         return running;
60     }
61
62     public String getProperty(String name) {
63         return properties.getProperty(name, "");
64     }
65
66     public void init(Properties properties, String propertiesPath) {
67
68         try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
69
70             LOG.debug("propertiesPath: " + propertiesPath);
71             this.properties = (Properties) properties.clone();
72             this.properties.load(in);
73
74
75             String timeoutStr = this.properties.getProperty("timeout");
76             LOG.debug("timeoutStr: " + timeoutStr);
77
78             if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
79                 timeout = parseTimeOutValue(timeoutStr);
80             }
81
82             String fetchPauseStr = this.properties.getProperty("fetchPause");
83             LOG.debug("fetchPause(Str): " + fetchPauseStr);
84             if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
85                 fetchPause = parseFetchPause(fetchPauseStr);
86             }
87             LOG.debug("fetchPause: " + fetchPause);
88
89
90             this.consumer = MRClientFactory.createConsumer(propertiesPath);
91             ready = true;
92         } catch (Exception e) {
93             LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
94         }
95     }
96
97     private int parseTimeOutValue(String timeoutStr) {
98         try {
99             return Integer.parseInt(timeoutStr);
100         } catch (NumberFormatException e) {
101             LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
102         }
103         return timeout;
104     }
105
106     private int parseFetchPause(String fetchPauseStr) {
107         try {
108             return Integer.parseInt(fetchPauseStr);
109         } catch (NumberFormatException e) {
110             LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
111         }
112         return fetchPause;
113     }
114
115
116     @Override
117     public void run() {
118         if (ready) {
119
120             running = true;
121
122             while (running) {
123
124                 try {
125                     boolean noData = true;
126                     consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
127                     for (String msg : consumerResponse.getActualMessages()) {
128                         noData = false;
129                         LOG.info("Received message from DMaaP:\n" + msg);
130                         processMsg(msg);
131                     }
132
133                     if (noData) {
134                         pauseThread();
135                     }
136                 } catch (Exception e) {
137                     LOG.error("Caught exception reading from DMaaP", e);
138                     running = false;
139                 }
140             }
141         }
142     }
143
144     private void pauseThread() throws InterruptedException {
145         if (fetchPause > 0) {
146             LOG.info(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
147             Thread.sleep(fetchPause);
148         } else {
149             LOG.info("No data received from fetch.  No fetch pause specified - retrying immediately");
150         }
151     }
152
153     abstract public void processMsg(String msg) throws InvalidMessageException;
154 }