1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.cambria.beans;
24 import java.util.HashMap;
25 import java.util.concurrent.TimeUnit;
27 //import org.slf4j.Logger;
28 //import org.slf4j.LoggerFactory;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.beans.factory.annotation.Qualifier;
35 import org.springframework.stereotype.Component;
37 import com.att.nsa.cambria.CambriaApiException;
38 import com.att.nsa.cambria.constants.CambriaConstants;
39 import com.att.nsa.cambria.exception.DMaaPResponseCode;
40 import com.att.nsa.cambria.exception.ErrorResponse;
41 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
44 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
45 import com.att.nsa.metrics.impl.CdmRateTicker;
48 * class provide rate information
54 public class DMaaPCambriaLimiter {
56 * constructor initializes
59 * @throws missingReqdSetting
60 * @throws invalidSettingValue
63 public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
64 throws missingReqdSetting, invalidSettingValue {
65 fRateInfo = new HashMap<String, RateInfo>();
66 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
67 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
68 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
69 CambriaConstants.kDefault_RateLimitWindowLength);
70 fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
71 CambriaConstants.kDefault_SleepMsOnRateLimit);
75 * static method provide the sleep time
77 * @param ratePerMinute
80 public static long getSleepMsForRate(double ratePerMinute) {
81 if (ratePerMinute <= 0.0)
83 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
87 * Construct a rate limiter.
89 * @param maxEmptyPollsPerMinute
90 * Pass <= 0 to deactivate rate limiting.
91 * @param windowLengthMins
93 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
94 this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
98 * Construct a rate limiter
100 * @param maxEmptyPollsPerMinute
101 * Pass <= 0 to deactivate rate limiting.
103 * @param windowLengthMins
105 public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
106 fRateInfo = new HashMap<String, RateInfo>();
107 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
108 fWindowLengthMins = windowLengthMins;
109 fSleepMs = Math.max(0, sleepMs);
113 * Tell the rate limiter about a call to a topic/group/id. If the rate is
114 * too high, this call delays its return and throws an exception.
117 * @param consumerGroup
119 * @throws CambriaApiException
121 public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
122 // do nothing if rate is configured 0 or less
123 if (fMaxEmptyPollsPerMinute <= 0) {
127 // setup rate info for this tuple
128 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
130 final double rate = ri.onCall();
131 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
133 if (rate > fMaxEmptyPollsPerMinute) {
135 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
138 log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
139 + " ms sleep, then responding in error.");
140 Thread.sleep(fSleepMs);
142 log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
144 } catch (InterruptedException e) {
145 log.error(e.toString());
146 Thread.currentThread().interrupt();
148 ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
149 DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
150 "This client is making too many requests. Please use a long poll "
151 + "setting to decrease the number of requests that result in empty responses. ");
152 log.info(errRes.toString());
153 throw new CambriaApiException(errRes);
160 * @param consumerGroup
164 public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
165 // check for good replies
167 // that was a good send, reset the metric
168 getRateInfo(topic, consumerGroup, clientId).reset();
172 private static class RateInfo {
174 * constructor initialzes
177 * @param windowLengthMinutes
179 public RateInfo(String label, int windowLengthMinutes) {
181 fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
182 windowLengthMinutes, TimeUnit.MINUTES);
185 public String getLabel() {
190 * CdmRateTicker is reset
192 public void reset() {
193 fCallRateSinceLastMsgSend.reset();
200 public double onCall() {
201 fCallRateSinceLastMsgSend.tick();
202 return fCallRateSinceLastMsgSend.getRate();
205 private final String fLabel;
206 private final CdmRateTicker fCallRateSinceLastMsgSend;
209 private final HashMap<String, RateInfo> fRateInfo;
210 private final double fMaxEmptyPollsPerMinute;
211 private final int fWindowLengthMins;
212 private final long fSleepMs;
213 //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
214 private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
215 private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
216 final String key = makeKey(topic, consumerGroup, clientId);
217 RateInfo ri = fRateInfo.get(key);
219 ri = new RateInfo(key, fWindowLengthMins);
220 fRateInfo.put(key, ri);
225 private String makeKey(String topic, String group, String id) {
226 return topic + "::" + group + "::" + id;