DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
27 import com.att.nsa.drumlin.till.nv.rrNvReadable;
28 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
29 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
30 import com.att.nsa.metrics.impl.CdmRateTicker;
31 import org.onap.dmaap.dmf.mr.CambriaApiException;
32 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
33 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
34 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
35 import org.onap.dmaap.dmf.mr.utils.Utils;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Qualifier;
38 import org.springframework.stereotype.Component;
39
40 import java.util.Date;
41 import java.util.HashMap;
42 import java.util.concurrent.TimeUnit;
43
44 /**
45  * class provide rate information
46  * 
47  * @author anowarul.islam
48  *
49  */
50 @Component
51 public class DMaaPCambriaLimiter {
52         private final HashMap<String, RateInfo> fRateInfo;
53         private final double fMaxEmptyPollsPerMinute;
54         private final double fMaxPollsPerMinute;
55         private final int fWindowLengthMins;
56         private final long fSleepMs;
57         private final long fSleepMs1;
58         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
59         
60         /**
61          * constructor initializes
62          * 
63          * @param settings
64          * @throws missingReqdSetting
65          * @throws invalidSettingValue
66          */
67         @Autowired
68         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
69                         fRateInfo = new HashMap<>();
70                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
71                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
72                 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
73                                 30);
74                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
75                                 CambriaConstants.kDefault_RateLimitWindowLength);
76                 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
77                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
78                 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
79                                 5000);
80                 
81         }
82         
83         /**
84          * Construct a rate limiter.
85          * 
86          * @param maxEmptyPollsPerMinute
87          *            Pass <= 0 to deactivate rate limiting.
88          *            @param windowLengthMins
89          */
90         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
91                 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
92         }
93
94         /**
95          * Construct a rate limiter
96          * 
97          * @param maxEmptyPollsPerMinute
98          *            Pass <= 0 to deactivate rate limiting.
99          * @param sleepMs
100          * @param windowLengthMins
101          */
102         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
103                 fRateInfo = new HashMap<>();
104                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
105                 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
106                 fWindowLengthMins = windowLengthMins;
107                 fSleepMs = Math.max(0, sleepMs);
108                 fSleepMs1 = Math.max(0, sleepMS1);
109         }
110
111         /**
112          * static method provide the sleep time
113          * 
114          * @param ratePerMinute
115          * @return
116          */
117         public static long getSleepMsForRate(double ratePerMinute) {
118                 if (ratePerMinute <= 0.0)
119                         return 0;
120                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
121         }
122
123         /**
124          * Tell the rate limiter about a call to a topic/group/id. If the rate is
125          * too high, this call delays its return and throws an exception.
126          * 
127          * @param topic
128          * @param consumerGroup
129          * @param clientId
130          * @throws CambriaApiException
131          */
132         public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
133                 // do nothing if rate is configured 0 or less
134                 if (fMaxEmptyPollsPerMinute <= 0) {
135                         return;
136                 }
137                                 // setup rate info for this tuple
138                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
139                 final double rate = ri.onCall();
140                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
141                 if (rate > fMaxEmptyPollsPerMinute) {
142                         try {
143                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
144                                                 + ".");
145                                 if (fSleepMs > 0) {
146                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
147                                                         + " ms sleep, then responding in error.");
148                                         Thread.sleep(fSleepMs);
149                                         
150                                 } else {
151                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
152                                 }
153                         } catch (InterruptedException e) {
154                                 log.error("Exception "+ e);
155                                 Thread.currentThread().interrupt();
156                         }
157                         
158                         
159                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
160                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
161                                         "This client is making too many requests. Please use a long poll "
162                                                         + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
163                         
164                         log.info(errRes.toString());
165                         throw new CambriaApiException(errRes);
166                 }
167                 
168                 
169         }
170
171         /**
172          * 
173          * @param topic
174          * @param consumerGroup
175          * @param clientId
176          * @param sentCount
177          */
178         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
179                 // check for good replies
180                 if (sentCount > 0) {
181                         // that was a good send, reset the metric
182                         getRateInfo(topic, consumerGroup, clientId).reset();
183                 }
184         }
185
186         private static class RateInfo {
187                 private final String fLabel;
188                 private final CdmRateTicker fCallRateSinceLastMsgSend;
189                 /**
190                  * constructor initialzes
191                  * 
192                  * @param label
193                  * @param windowLengthMinutes
194                  */
195                 public RateInfo(String label, int windowLengthMinutes) {
196                         fLabel = label;
197                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
198                                         windowLengthMinutes, TimeUnit.MINUTES);
199                 }
200                 
201                 public String getLabel() {
202                         return fLabel;
203                 }
204
205                 /**
206                  * CdmRateTicker is reset
207                  */
208                 public void reset() {
209                         fCallRateSinceLastMsgSend.reset();
210                 }
211
212                 /**
213                  * 
214                  * @return
215                  */
216                 public double onCall() {
217                         fCallRateSinceLastMsgSend.tick();
218                         return fCallRateSinceLastMsgSend.getRate();
219                 }
220         }
221         
222         
223         
224         
225         
226         
227         
228         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
229                 final String key = makeKey(topic, consumerGroup, clientId);
230                 RateInfo ri = fRateInfo.get(key);
231                 if (ri == null) {
232                         ri = new RateInfo(key, fWindowLengthMins);
233                         fRateInfo.put(key, ri);
234                 }
235                 return ri;
236         }
237         
238         
239         
240
241         
242         
243         
244         private String makeKey(String topic, String group, String id) {
245                 return topic + "::" + group + "::" + id;
246         }
247 }