1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.beans;
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.constants.CambriaConstants;
34 import com.att.dmf.mr.exception.DMaaPResponseCode;
35 import com.att.dmf.mr.exception.ErrorResponse;
36 import com.att.dmf.mr.utils.Utils;
38 import com.att.eelf.configuration.EELFLogger;
39 import com.att.eelf.configuration.EELFManager;
40 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
41 import com.att.nsa.drumlin.till.nv.rrNvReadable;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
44 import com.att.nsa.metrics.impl.CdmRateTicker;
47 * class provide rate information
49 * @author anowarul.islam
53 public class DMaaPCambriaLimiter {
54 private final HashMap<String, RateInfo> fRateInfo;
55 private final HashMap<String, RateInfoCheck> fRateInfoCheck;
56 private final double fMaxEmptyPollsPerMinute;
57 private final double fMaxPollsPerMinute;
58 private final int fWindowLengthMins;
59 private final long fSleepMs;
60 private final long fSleepMs1;
61 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
64 * constructor initializes
67 * @throws missingReqdSetting
68 * @throws invalidSettingValue
71 public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
72 fRateInfo = new HashMap<>();
73 fRateInfoCheck = new HashMap<>();
74 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
75 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
76 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
78 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
79 CambriaConstants.kDefault_RateLimitWindowLength);
80 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
81 CambriaConstants.kDefault_SleepMsOnRateLimit);
82 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
88 * Construct a rate limiter.
90 * @param maxEmptyPollsPerMinute
91 * Pass <= 0 to deactivate rate limiting.
92 * @param windowLengthMins
94 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
95 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
99 * Construct a rate limiter
101 * @param maxEmptyPollsPerMinute
102 * Pass <= 0 to deactivate rate limiting.
104 * @param windowLengthMins
106 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
107 fRateInfo = new HashMap<>();
108 fRateInfoCheck = new HashMap<>();
109 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
110 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
111 fWindowLengthMins = windowLengthMins;
112 fSleepMs = Math.max(0, sleepMs);
113 fSleepMs1 = Math.max(0, sleepMS1);
117 * static method provide the sleep time
119 * @param ratePerMinute
122 public static long getSleepMsForRate(double ratePerMinute) {
123 if (ratePerMinute <= 0.0)
125 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
129 * Tell the rate limiter about a call to a topic/group/id. If the rate is
130 * too high, this call delays its return and throws an exception.
133 * @param consumerGroup
135 * @throws CambriaApiException
137 public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
138 // do nothing if rate is configured 0 or less
139 if (fMaxEmptyPollsPerMinute <= 0) {
142 // setup rate info for this tuple
143 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
144 final double rate = ri.onCall();
145 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
146 if (rate > fMaxEmptyPollsPerMinute) {
148 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
151 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
152 + " ms sleep, then responding in error.");
153 Thread.sleep(fSleepMs);
156 log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
158 } catch (InterruptedException e) {
159 log.error("Exception "+ e);
164 ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
165 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
166 "This client is making too many requests. Please use a long poll "
167 + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
169 log.info(errRes.toString());
170 throw new CambriaApiException(errRes);
172 /*if (fMaxPollsPerMinute <= 0) {
176 final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
177 final double ratevalue = ric.onCall();
178 if (ratevalue > fMaxPollsPerMinute) {
180 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
182 if (fSleepMs1 > fMaxPollsPerMinute) {
183 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
184 + " ms sleep, then responding in error.");
185 Thread.sleep(fSleepMs1);
188 log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
190 } catch (InterruptedException e) {
195 ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
196 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
197 "This client is making too many requests "
198 + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
200 log.info(errRes.toString());
201 throw new CambriaApiException(errRes);
209 * @param consumerGroup
213 public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
214 // check for good replies
216 // that was a good send, reset the metric
217 getRateInfo(topic, consumerGroup, clientId).reset();
221 private static class RateInfo {
222 private final String fLabel;
223 private final CdmRateTicker fCallRateSinceLastMsgSend;
225 * constructor initialzes
228 * @param windowLengthMinutes
230 public RateInfo(String label, int windowLengthMinutes) {
232 fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
233 windowLengthMinutes, TimeUnit.MINUTES);
236 public String getLabel() {
241 * CdmRateTicker is reset
243 public void reset() {
244 fCallRateSinceLastMsgSend.reset();
251 public double onCall() {
252 fCallRateSinceLastMsgSend.tick();
253 return fCallRateSinceLastMsgSend.getRate();
259 private static class RateInfoCheck {
261 private final String fLabel;
262 private final CdmRateTicker fCallRateSinceLastMsgSend;
264 * constructor initialzes
267 * @param windowLengthMinutes
269 public RateInfoCheck(String label, int windowLengthMinutes) {
271 fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
272 windowLengthMinutes, TimeUnit.MINUTES);
275 public String getLabel() {
280 * CdmRateTicker is reset
282 public void reset() {
283 fCallRateSinceLastMsgSend.reset();
290 public double onCall() {
291 fCallRateSinceLastMsgSend.tick();
292 return fCallRateSinceLastMsgSend.getRate();
299 private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
300 final String key = makeKey(topic, consumerGroup, clientId);
301 RateInfo ri = fRateInfo.get(key);
303 ri = new RateInfo(key, fWindowLengthMins);
304 fRateInfo.put(key, ri);
310 /* private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
311 final String key = makeKey(topic, consumerGroup, clientId);
312 RateInfoCheck ri = fRateInfoCheck.get(key);
314 ri = new RateInfoCheck(key, 1);
315 fRateInfoCheck.put(key, ri);
323 private String makeKey(String topic, String group, String id) {
324 return topic + "::" + group + "::" + id;