1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.beans;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.constants.CambriaConstants;
34 import com.att.dmf.mr.exception.DMaaPResponseCode;
35 import com.att.dmf.mr.exception.ErrorResponse;
36 import com.att.dmf.mr.utils.Utils;
38 //import org.slf4j.Logger;
39 //import org.slf4j.LoggerFactory;
41 import com.att.eelf.configuration.EELFLogger;
42 import com.att.eelf.configuration.EELFManager;
43 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
44 import com.att.nsa.drumlin.till.nv.rrNvReadable;
45 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
46 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
47 import com.att.nsa.metrics.impl.CdmRateTicker;
50 * class provide rate information
52 * @author anowarul.islam
56 public class DMaaPCambriaLimiter {
58 * constructor initializes
61 * @throws missingReqdSetting
62 * @throws invalidSettingValue
65 public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
66 throws missingReqdSetting, invalidSettingValue {
67 fRateInfo = new HashMap<String, RateInfo>();
68 fRateInfoCheck = new HashMap<String, RateInfoCheck>();
69 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
70 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
71 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
73 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
74 CambriaConstants.kDefault_RateLimitWindowLength);
75 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
76 CambriaConstants.kDefault_SleepMsOnRateLimit);
77 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
83 * static method provide the sleep time
85 * @param ratePerMinute
88 public static long getSleepMsForRate(double ratePerMinute) {
89 if (ratePerMinute <= 0.0)
91 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
95 * Construct a rate limiter.
97 * @param maxEmptyPollsPerMinute
98 * Pass <= 0 to deactivate rate limiting.
99 * @param windowLengthMins
101 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
102 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
106 * Construct a rate limiter
108 * @param maxEmptyPollsPerMinute
109 * Pass <= 0 to deactivate rate limiting.
111 * @param windowLengthMins
113 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
114 fRateInfo = new HashMap<String, RateInfo>();
115 fRateInfoCheck = new HashMap<String, RateInfoCheck>();
116 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
117 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
118 fWindowLengthMins = windowLengthMins;
119 fSleepMs = Math.max(0, sleepMs);
120 fSleepMs1 = Math.max(0, sleepMS1);
124 * Tell the rate limiter about a call to a topic/group/id. If the rate is
125 * too high, this call delays its return and throws an exception.
128 * @param consumerGroup
130 * @throws CambriaApiException
132 public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
133 // do nothing if rate is configured 0 or less
134 if (fMaxEmptyPollsPerMinute <= 0) {
137 // setup rate info for this tuple
138 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
139 final double rate = ri.onCall();
140 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
141 if (rate > fMaxEmptyPollsPerMinute) {
143 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
146 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
147 + " ms sleep, then responding in error.");
148 Thread.sleep(fSleepMs);
151 log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
153 } catch (InterruptedException e) {
158 ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
159 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
160 "This client is making too many requests. Please use a long poll "
161 + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
163 log.info(errRes.toString());
164 throw new CambriaApiException(errRes);
166 /*if (fMaxPollsPerMinute <= 0) {
170 final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
171 final double ratevalue = ric.onCall();
172 if (ratevalue > fMaxPollsPerMinute) {
174 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
176 if (fSleepMs1 > fMaxPollsPerMinute) {
177 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
178 + " ms sleep, then responding in error.");
179 Thread.sleep(fSleepMs1);
182 log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
184 } catch (InterruptedException e) {
189 ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
190 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
191 "This client is making too many requests "
192 + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
194 log.info(errRes.toString());
195 throw new CambriaApiException(errRes);
203 * @param consumerGroup
207 public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
208 // check for good replies
210 // that was a good send, reset the metric
211 getRateInfo(topic, consumerGroup, clientId).reset();
215 private static class RateInfo {
217 * constructor initialzes
220 * @param windowLengthMinutes
222 public RateInfo(String label, int windowLengthMinutes) {
224 fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
225 windowLengthMinutes, TimeUnit.MINUTES);
228 public String getLabel() {
233 * CdmRateTicker is reset
235 public void reset() {
236 fCallRateSinceLastMsgSend.reset();
243 public double onCall() {
244 fCallRateSinceLastMsgSend.tick();
245 return fCallRateSinceLastMsgSend.getRate();
248 private final String fLabel;
249 private final CdmRateTicker fCallRateSinceLastMsgSend;
254 private static class RateInfoCheck {
256 * constructor initialzes
259 * @param windowLengthMinutes
261 public RateInfoCheck(String label, int windowLengthMinutes) {
263 fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
264 windowLengthMinutes, TimeUnit.MINUTES);
267 public String getLabel() {
272 * CdmRateTicker is reset
274 public void reset() {
275 fCallRateSinceLastMsgSend.reset();
282 public double onCall() {
283 fCallRateSinceLastMsgSend.tick();
284 return fCallRateSinceLastMsgSend.getRate();
287 private final String fLabel;
288 private final CdmRateTicker fCallRateSinceLastMsgSend;
292 private final HashMap<String, RateInfo> fRateInfo;
293 private final HashMap<String, RateInfoCheck> fRateInfoCheck;
294 private final double fMaxEmptyPollsPerMinute;
295 private final double fMaxPollsPerMinute;
296 private final int fWindowLengthMins;
297 private final long fSleepMs;
298 private final long fSleepMs1;
299 //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
300 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
302 private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
303 final String key = makeKey(topic, consumerGroup, clientId);
304 RateInfo ri = fRateInfo.get(key);
306 ri = new RateInfo(key, fWindowLengthMins);
307 fRateInfo.put(key, ri);
313 private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
314 final String key = makeKey(topic, consumerGroup, clientId);
315 RateInfoCheck ri = fRateInfoCheck.get(key);
317 ri = new RateInfoCheck(key, 1);
318 fRateInfoCheck.put(key, ri);
326 private String makeKey(String topic, String group, String id) {
327 return topic + "::" + group + "::" + id;