Merge "sonar critical for Conditional Statement"
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
27
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
31
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.constants.CambriaConstants;
34 import com.att.dmf.mr.exception.DMaaPResponseCode;
35 import com.att.dmf.mr.exception.ErrorResponse;
36 import com.att.dmf.mr.utils.Utils;
37
38 //import org.slf4j.Logger;
39 //import org.slf4j.LoggerFactory;
40
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
44 import com.att.nsa.drumlin.till.nv.rrNvReadable;
45 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
46 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
47 import com.att.nsa.metrics.impl.CdmRateTicker;
48
49 /**
50  * class provide rate information
51  * 
52  * @author anowarul.islam
53  *
54  */
55 @Component
56 public class DMaaPCambriaLimiter {
57         /**
58          * constructor initializes
59          * 
60          * @param settings
61          * @throws missingReqdSetting
62          * @throws invalidSettingValue
63          */
64         @Autowired
65         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
66                         throws missingReqdSetting, invalidSettingValue {
67                         fRateInfo = new HashMap<String, RateInfo>();
68                 fRateInfoCheck = new HashMap<String, RateInfoCheck>();
69                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
70                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
71                 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
72                                 30);
73                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
74                                 CambriaConstants.kDefault_RateLimitWindowLength);
75                 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
76                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
77                 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
78                                 5000);
79                 
80         }
81
82         /**
83          * static method provide the sleep time
84          * 
85          * @param ratePerMinute
86          * @return
87          */
88         public static long getSleepMsForRate(double ratePerMinute) {
89                 if (ratePerMinute <= 0.0)
90                         return 0;
91                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
92         }
93
94         /**
95          * Construct a rate limiter.
96          * 
97          * @param maxEmptyPollsPerMinute
98          *            Pass <= 0 to deactivate rate limiting.
99          *            @param windowLengthMins
100          */
101         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
102                 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
103         }
104
105         /**
106          * Construct a rate limiter
107          * 
108          * @param maxEmptyPollsPerMinute
109          *            Pass <= 0 to deactivate rate limiting.
110          * @param sleepMs
111          * @param windowLengthMins
112          */
113         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
114                 fRateInfo = new HashMap<String, RateInfo>();
115                 fRateInfoCheck = new HashMap<String, RateInfoCheck>();
116                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
117                 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
118                 fWindowLengthMins = windowLengthMins;
119                 fSleepMs = Math.max(0, sleepMs);
120                 fSleepMs1 = Math.max(0, sleepMS1);
121         }
122
123         /**
124          * Tell the rate limiter about a call to a topic/group/id. If the rate is
125          * too high, this call delays its return and throws an exception.
126          * 
127          * @param topic
128          * @param consumerGroup
129          * @param clientId
130          * @throws CambriaApiException
131          */
132         public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
133                 // do nothing if rate is configured 0 or less
134                 if (fMaxEmptyPollsPerMinute <= 0) {
135                         return;
136                 }
137                                 // setup rate info for this tuple
138                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
139                 final double rate = ri.onCall();
140                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
141                 if (rate > fMaxEmptyPollsPerMinute) {
142                         try {
143                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
144                                                 + ".");
145                                 if (fSleepMs > 0) {
146                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
147                                                         + " ms sleep, then responding in error.");
148                                         Thread.sleep(fSleepMs);
149                                         
150                                 } else {
151                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
152                                 }
153                         } catch (InterruptedException e) {
154                                 // ignore
155                         }
156                         
157                         
158                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
159                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
160                                         "This client is making too many requests. Please use a long poll "
161                                                         + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
162                         
163                         log.info(errRes.toString());
164                         throw new CambriaApiException(errRes);
165                 }
166                 /*if (fMaxPollsPerMinute <= 0) {
167                         return;
168                 }
169                 
170                 final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
171                 final double ratevalue = ric.onCall();
172                 if (ratevalue > fMaxPollsPerMinute) {
173                         try {
174                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
175                                                 + ".");
176                                 if (fSleepMs1 > fMaxPollsPerMinute) {
177                                 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
178                                                         + " ms sleep, then responding in error.");
179                                         Thread.sleep(fSleepMs1);
180                                         ric.reset();
181                                 } else {
182                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
183                                 }
184                         } catch (InterruptedException e) {
185                                 // ignore
186                         }
187                         
188                         
189                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
190                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
191                                         "This client is making too many requests "
192                                                         + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
193                         
194                         log.info(errRes.toString());
195                         throw new CambriaApiException(errRes);
196                 }*/
197                 
198         }
199
200         /**
201          * 
202          * @param topic
203          * @param consumerGroup
204          * @param clientId
205          * @param sentCount
206          */
207         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
208                 // check for good replies
209                 if (sentCount > 0) {
210                         // that was a good send, reset the metric
211                         getRateInfo(topic, consumerGroup, clientId).reset();
212                 }
213         }
214
215         private static class RateInfo {
216                 /**
217                  * constructor initialzes
218                  * 
219                  * @param label
220                  * @param windowLengthMinutes
221                  */
222                 public RateInfo(String label, int windowLengthMinutes) {
223                         fLabel = label;
224                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
225                                         windowLengthMinutes, TimeUnit.MINUTES);
226                 }
227                 
228                 public String getLabel() {
229                         return fLabel;
230                 }
231
232                 /**
233                  * CdmRateTicker is reset
234                  */
235                 public void reset() {
236                         fCallRateSinceLastMsgSend.reset();
237                 }
238
239                 /**
240                  * 
241                  * @return
242                  */
243                 public double onCall() {
244                         fCallRateSinceLastMsgSend.tick();
245                         return fCallRateSinceLastMsgSend.getRate();
246                 }
247
248                 private final String fLabel;
249                 private final CdmRateTicker fCallRateSinceLastMsgSend;
250         }
251         
252         
253         
254         private static class RateInfoCheck {
255                 /**
256                  * constructor initialzes
257                  * 
258                  * @param label
259                  * @param windowLengthMinutes
260                  */
261                 public RateInfoCheck(String label, int windowLengthMinutes) {
262                         fLabel = label;
263                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
264                                         windowLengthMinutes, TimeUnit.MINUTES);
265                 }
266
267                 public String getLabel() {
268                         return fLabel;
269                 }
270
271                 /**
272                  * CdmRateTicker is reset
273                  */
274                 public void reset() {
275                         fCallRateSinceLastMsgSend.reset();
276                 }
277
278                 /**
279                  * 
280                  * @return
281                  */
282                 public double onCall() {
283                         fCallRateSinceLastMsgSend.tick();
284                         return fCallRateSinceLastMsgSend.getRate();
285                 }
286
287                 private final String fLabel;
288                 private final CdmRateTicker fCallRateSinceLastMsgSend;
289         }
290         
291         
292         private final HashMap<String, RateInfo> fRateInfo;
293         private final HashMap<String, RateInfoCheck> fRateInfoCheck;
294         private final double fMaxEmptyPollsPerMinute;
295         private final double fMaxPollsPerMinute;
296         private final int fWindowLengthMins;
297         private final long fSleepMs;
298         private final long fSleepMs1;
299         //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
300         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
301         
302         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
303                 final String key = makeKey(topic, consumerGroup, clientId);
304                 RateInfo ri = fRateInfo.get(key);
305                 if (ri == null) {
306                         ri = new RateInfo(key, fWindowLengthMins);
307                         fRateInfo.put(key, ri);
308                 }
309                 return ri;
310         }
311         
312         
313         private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
314                 final String key = makeKey(topic, consumerGroup, clientId);
315                 RateInfoCheck ri = fRateInfoCheck.get(key);
316                 if (ri == null) {
317                         ri = new RateInfoCheck(key, 1);
318                         fRateInfoCheck.put(key, ri);
319                 }
320                 return ri;
321         }
322
323         
324         
325         
326         private String makeKey(String topic, String group, String id) {
327                 return topic + "::" + group + "::" + id;
328         }
329 }