Merge "KafkaLiveLockAvoider2.java: fixed sonar issues"
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
27
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
31
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.constants.CambriaConstants;
34 import com.att.dmf.mr.exception.DMaaPResponseCode;
35 import com.att.dmf.mr.exception.ErrorResponse;
36 import com.att.dmf.mr.utils.Utils;
37
38 import com.att.eelf.configuration.EELFLogger;
39 import com.att.eelf.configuration.EELFManager;
40 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
41 import com.att.nsa.drumlin.till.nv.rrNvReadable;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
44 import com.att.nsa.metrics.impl.CdmRateTicker;
45
46 /**
47  * class provide rate information
48  * 
49  * @author anowarul.islam
50  *
51  */
52 @Component
53 public class DMaaPCambriaLimiter {
54         private final HashMap<String, RateInfo> fRateInfo;
55         private final HashMap<String, RateInfoCheck> fRateInfoCheck;
56         private final double fMaxEmptyPollsPerMinute;
57         private final double fMaxPollsPerMinute;
58         private final int fWindowLengthMins;
59         private final long fSleepMs;
60         private final long fSleepMs1;
61         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
62         
63         /**
64          * constructor initializes
65          * 
66          * @param settings
67          * @throws missingReqdSetting
68          * @throws invalidSettingValue
69          */
70         @Autowired
71         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
72                         fRateInfo = new HashMap<>();
73                 fRateInfoCheck = new HashMap<>();
74                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
75                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
76                 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
77                                 30);
78                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
79                                 CambriaConstants.kDefault_RateLimitWindowLength);
80                 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
81                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
82                 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
83                                 5000);
84                 
85         }
86         
87         /**
88          * Construct a rate limiter.
89          * 
90          * @param maxEmptyPollsPerMinute
91          *            Pass <= 0 to deactivate rate limiting.
92          *            @param windowLengthMins
93          */
94         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
95                 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
96         }
97
98         /**
99          * Construct a rate limiter
100          * 
101          * @param maxEmptyPollsPerMinute
102          *            Pass <= 0 to deactivate rate limiting.
103          * @param sleepMs
104          * @param windowLengthMins
105          */
106         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
107                 fRateInfo = new HashMap<>();
108                 fRateInfoCheck = new HashMap<>();
109                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
110                 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
111                 fWindowLengthMins = windowLengthMins;
112                 fSleepMs = Math.max(0, sleepMs);
113                 fSleepMs1 = Math.max(0, sleepMS1);
114         }
115
116         /**
117          * static method provide the sleep time
118          * 
119          * @param ratePerMinute
120          * @return
121          */
122         public static long getSleepMsForRate(double ratePerMinute) {
123                 if (ratePerMinute <= 0.0)
124                         return 0;
125                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
126         }
127
128         /**
129          * Tell the rate limiter about a call to a topic/group/id. If the rate is
130          * too high, this call delays its return and throws an exception.
131          * 
132          * @param topic
133          * @param consumerGroup
134          * @param clientId
135          * @throws CambriaApiException
136          */
137         public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
138                 // do nothing if rate is configured 0 or less
139                 if (fMaxEmptyPollsPerMinute <= 0) {
140                         return;
141                 }
142                                 // setup rate info for this tuple
143                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
144                 final double rate = ri.onCall();
145                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
146                 if (rate > fMaxEmptyPollsPerMinute) {
147                         try {
148                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
149                                                 + ".");
150                                 if (fSleepMs > 0) {
151                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
152                                                         + " ms sleep, then responding in error.");
153                                         Thread.sleep(fSleepMs);
154                                         
155                                 } else {
156                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
157                                 }
158                         } catch (InterruptedException e) {
159                                 log.error("Exception "+ e);
160                                 // ignore
161                         }
162                         
163                         
164                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
165                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
166                                         "This client is making too many requests. Please use a long poll "
167                                                         + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
168                         
169                         log.info(errRes.toString());
170                         throw new CambriaApiException(errRes);
171                 }
172                 /*if (fMaxPollsPerMinute <= 0) {
173                         return;
174                 }
175                 
176                 final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
177                 final double ratevalue = ric.onCall();
178                 if (ratevalue > fMaxPollsPerMinute) {
179                         try {
180                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
181                                                 + ".");
182                                 if (fSleepMs1 > fMaxPollsPerMinute) {
183                                 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
184                                                         + " ms sleep, then responding in error.");
185                                         Thread.sleep(fSleepMs1);
186                                         ric.reset();
187                                 } else {
188                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
189                                 }
190                         } catch (InterruptedException e) {
191                                 // ignore
192                         }
193                         
194                         
195                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
196                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
197                                         "This client is making too many requests "
198                                                         + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
199                         
200                         log.info(errRes.toString());
201                         throw new CambriaApiException(errRes);
202                 }*/
203                 
204         }
205
206         /**
207          * 
208          * @param topic
209          * @param consumerGroup
210          * @param clientId
211          * @param sentCount
212          */
213         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
214                 // check for good replies
215                 if (sentCount > 0) {
216                         // that was a good send, reset the metric
217                         getRateInfo(topic, consumerGroup, clientId).reset();
218                 }
219         }
220
221         private static class RateInfo {
222                 private final String fLabel;
223                 private final CdmRateTicker fCallRateSinceLastMsgSend;
224                 /**
225                  * constructor initialzes
226                  * 
227                  * @param label
228                  * @param windowLengthMinutes
229                  */
230                 public RateInfo(String label, int windowLengthMinutes) {
231                         fLabel = label;
232                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
233                                         windowLengthMinutes, TimeUnit.MINUTES);
234                 }
235                 
236                 public String getLabel() {
237                         return fLabel;
238                 }
239
240                 /**
241                  * CdmRateTicker is reset
242                  */
243                 public void reset() {
244                         fCallRateSinceLastMsgSend.reset();
245                 }
246
247                 /**
248                  * 
249                  * @return
250                  */
251                 public double onCall() {
252                         fCallRateSinceLastMsgSend.tick();
253                         return fCallRateSinceLastMsgSend.getRate();
254                 }
255         }
256         
257         
258         
259         private static class RateInfoCheck {
260                 
261                 private final String fLabel;
262                 private final CdmRateTicker fCallRateSinceLastMsgSend;
263                 /**
264                  * constructor initialzes
265                  * 
266                  * @param label
267                  * @param windowLengthMinutes
268                  */
269                 public RateInfoCheck(String label, int windowLengthMinutes) {
270                         fLabel = label;
271                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
272                                         windowLengthMinutes, TimeUnit.MINUTES);
273                 }
274
275                 public String getLabel() {
276                         return fLabel;
277                 }
278
279                 /**
280                  * CdmRateTicker is reset
281                  */
282                 public void reset() {
283                         fCallRateSinceLastMsgSend.reset();
284                 }
285
286                 /**
287                  * 
288                  * @return
289                  */
290                 public double onCall() {
291                         fCallRateSinceLastMsgSend.tick();
292                         return fCallRateSinceLastMsgSend.getRate();
293                 }
294         }
295         
296         
297         
298         
299         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
300                 final String key = makeKey(topic, consumerGroup, clientId);
301                 RateInfo ri = fRateInfo.get(key);
302                 if (ri == null) {
303                         ri = new RateInfo(key, fWindowLengthMins);
304                         fRateInfo.put(key, ri);
305                 }
306                 return ri;
307         }
308         
309         
310         /* private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
311                 final String key = makeKey(topic, consumerGroup, clientId);
312                 RateInfoCheck ri = fRateInfoCheck.get(key);
313                 if (ri == null) {
314                         ri = new RateInfoCheck(key, 1);
315                         fRateInfoCheck.put(key, ri);
316                 }
317                 return ri;
318         } */
319
320         
321         
322         
323         private String makeKey(String topic, String group, String id) {
324                 return topic + "::" + group + "::" + id;
325         }
326 }