bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.beans;
23
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
27
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
31
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.constants.CambriaConstants;
34 import com.att.dmf.mr.exception.DMaaPResponseCode;
35 import com.att.dmf.mr.exception.ErrorResponse;
36 import com.att.dmf.mr.utils.Utils;
37
38 import com.att.eelf.configuration.EELFLogger;
39 import com.att.eelf.configuration.EELFManager;
40 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
41 import com.att.nsa.drumlin.till.nv.rrNvReadable;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
44 import com.att.nsa.metrics.impl.CdmRateTicker;
45
46 /**
47  * class provide rate information
48  * 
49  * @author anowarul.islam
50  *
51  */
52 @Component
53 public class DMaaPCambriaLimiter {
54         private final HashMap<String, RateInfo> fRateInfo;
55         private final HashMap<String, RateInfoCheck> fRateInfoCheck;
56         private final double fMaxEmptyPollsPerMinute;
57         private final double fMaxPollsPerMinute;
58         private final int fWindowLengthMins;
59         private final long fSleepMs;
60         private final long fSleepMs1;
61         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
62         
63         /**
64          * constructor initializes
65          * 
66          * @param settings
67          * @throws missingReqdSetting
68          * @throws invalidSettingValue
69          */
70         @Autowired
71         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
72                         fRateInfo = new HashMap<>();
73                 fRateInfoCheck = new HashMap<>();
74                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
75                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
76                 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
77                                 30);
78                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
79                                 CambriaConstants.kDefault_RateLimitWindowLength);
80                 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
81                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
82                 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
83                                 5000);
84                 
85         }
86         
87         /**
88          * Construct a rate limiter.
89          * 
90          * @param maxEmptyPollsPerMinute
91          *            Pass <= 0 to deactivate rate limiting.
92          *            @param windowLengthMins
93          */
94         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
95                 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
96         }
97
98         /**
99          * Construct a rate limiter
100          * 
101          * @param maxEmptyPollsPerMinute
102          *            Pass <= 0 to deactivate rate limiting.
103          * @param sleepMs
104          * @param windowLengthMins
105          */
106         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
107                 fRateInfo = new HashMap<>();
108                 fRateInfoCheck = new HashMap<>();
109                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
110                 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
111                 fWindowLengthMins = windowLengthMins;
112                 fSleepMs = Math.max(0, sleepMs);
113                 fSleepMs1 = Math.max(0, sleepMS1);
114         }
115
116         /**
117          * static method provide the sleep time
118          * 
119          * @param ratePerMinute
120          * @return
121          */
122         public static long getSleepMsForRate(double ratePerMinute) {
123                 if (ratePerMinute <= 0.0)
124                         return 0;
125                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
126         }
127
128         /**
129          * Tell the rate limiter about a call to a topic/group/id. If the rate is
130          * too high, this call delays its return and throws an exception.
131          * 
132          * @param topic
133          * @param consumerGroup
134          * @param clientId
135          * @throws CambriaApiException
136          */
137         public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
138                 // do nothing if rate is configured 0 or less
139                 if (fMaxEmptyPollsPerMinute <= 0) {
140                         return;
141                 }
142                                 // setup rate info for this tuple
143                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
144                 final double rate = ri.onCall();
145                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
146                 if (rate > fMaxEmptyPollsPerMinute) {
147                         try {
148                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
149                                                 + ".");
150                                 if (fSleepMs > 0) {
151                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
152                                                         + " ms sleep, then responding in error.");
153                                         Thread.sleep(fSleepMs);
154                                         
155                                 } else {
156                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
157                                 }
158                         } catch (InterruptedException e) {
159                                 log.error("Exception "+ e);
160                                 // ignore
161                         }
162                         
163                         
164                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
165                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
166                                         "This client is making too many requests. Please use a long poll "
167                                                         + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
168                         
169                         log.info(errRes.toString());
170                         throw new CambriaApiException(errRes);
171                 }
172                 
173                 
174         }
175
176         /**
177          * 
178          * @param topic
179          * @param consumerGroup
180          * @param clientId
181          * @param sentCount
182          */
183         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
184                 // check for good replies
185                 if (sentCount > 0) {
186                         // that was a good send, reset the metric
187                         getRateInfo(topic, consumerGroup, clientId).reset();
188                 }
189         }
190
191         private static class RateInfo {
192                 private final String fLabel;
193                 private final CdmRateTicker fCallRateSinceLastMsgSend;
194                 /**
195                  * constructor initialzes
196                  * 
197                  * @param label
198                  * @param windowLengthMinutes
199                  */
200                 public RateInfo(String label, int windowLengthMinutes) {
201                         fLabel = label;
202                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
203                                         windowLengthMinutes, TimeUnit.MINUTES);
204                 }
205                 
206                 public String getLabel() {
207                         return fLabel;
208                 }
209
210                 /**
211                  * CdmRateTicker is reset
212                  */
213                 public void reset() {
214                         fCallRateSinceLastMsgSend.reset();
215                 }
216
217                 /**
218                  * 
219                  * @return
220                  */
221                 public double onCall() {
222                         fCallRateSinceLastMsgSend.tick();
223                         return fCallRateSinceLastMsgSend.getRate();
224                 }
225         }
226         
227         
228         
229         private static class RateInfoCheck {
230                 
231                 private final String fLabel;
232                 private final CdmRateTicker fCallRateSinceLastMsgSend;
233                 /**
234                  * constructor initialzes
235                  * 
236                  * @param label
237                  * @param windowLengthMinutes
238                  */
239                 public RateInfoCheck(String label, int windowLengthMinutes) {
240                         fLabel = label;
241                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
242                                         windowLengthMinutes, TimeUnit.MINUTES);
243                 }
244
245                 public String getLabel() {
246                         return fLabel;
247                 }
248
249                 /**
250                  * CdmRateTicker is reset
251                  */
252                 public void reset() {
253                         fCallRateSinceLastMsgSend.reset();
254                 }
255
256                 /**
257                  * 
258                  * @return
259                  */
260                 public double onCall() {
261                         fCallRateSinceLastMsgSend.tick();
262                         return fCallRateSinceLastMsgSend.getRate();
263                 }
264         }
265         
266         
267         
268         
269         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
270                 final String key = makeKey(topic, consumerGroup, clientId);
271                 RateInfo ri = fRateInfo.get(key);
272                 if (ri == null) {
273                         ri = new RateInfo(key, fWindowLengthMins);
274                         fRateInfo.put(key, ri);
275                 }
276                 return ri;
277         }
278         
279         
280         
281
282         
283         
284         
285         private String makeKey(String topic, String group, String id) {
286                 return topic + "::" + group + "::" + id;
287         }
288 }