revert few sonar fixes
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.beans;
23
24 import java.util.Date;
25 import java.util.HashMap;
26 import java.util.concurrent.TimeUnit;
27
28 import org.springframework.beans.factory.annotation.Autowired;
29 import org.springframework.beans.factory.annotation.Qualifier;
30 import org.springframework.stereotype.Component;
31
32 import org.onap.dmaap.dmf.mr.CambriaApiException;
33 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
34 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
35 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
36 import org.onap.dmaap.dmf.mr.utils.Utils;
37
38 import com.att.eelf.configuration.EELFLogger;
39 import com.att.eelf.configuration.EELFManager;
40 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
41 import com.att.nsa.drumlin.till.nv.rrNvReadable;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
44 import com.att.nsa.metrics.impl.CdmRateTicker;
45
46 /**
47  * class provide rate information
48  * 
49  * @author anowarul.islam
50  *
51  */
52 @Component
53 public class DMaaPCambriaLimiter {
54         private final HashMap<String, RateInfo> fRateInfo;
55         private final double fMaxEmptyPollsPerMinute;
56         private final double fMaxPollsPerMinute;
57         private final int fWindowLengthMins;
58         private final long fSleepMs;
59         private final long fSleepMs1;
60         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
61         
62         /**
63          * constructor initializes
64          * 
65          * @param settings
66          * @throws missingReqdSetting
67          * @throws invalidSettingValue
68          */
69         @Autowired
70         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) {
71                         fRateInfo = new HashMap<>();
72                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
73                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
74                 fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
75                                 30);
76                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
77                                 CambriaConstants.kDefault_RateLimitWindowLength);
78                 fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
79                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
80                 fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
81                                 5000);
82                 
83         }
84         
85         /**
86          * Construct a rate limiter.
87          * 
88          * @param maxEmptyPollsPerMinute
89          *            Pass <= 0 to deactivate rate limiting.
90          *            @param windowLengthMins
91          */
92         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
93                 this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
94         }
95
96         /**
97          * Construct a rate limiter
98          * 
99          * @param maxEmptyPollsPerMinute
100          *            Pass <= 0 to deactivate rate limiting.
101          * @param sleepMs
102          * @param windowLengthMins
103          */
104         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
105                 fRateInfo = new HashMap<>();
106                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
107                 fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
108                 fWindowLengthMins = windowLengthMins;
109                 fSleepMs = Math.max(0, sleepMs);
110                 fSleepMs1 = Math.max(0, sleepMS1);
111         }
112
113         /**
114          * static method provide the sleep time
115          * 
116          * @param ratePerMinute
117          * @return
118          */
119         public static long getSleepMsForRate(double ratePerMinute) {
120                 if (ratePerMinute <= 0.0)
121                         return 0;
122                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
123         }
124
125         /**
126          * Tell the rate limiter about a call to a topic/group/id. If the rate is
127          * too high, this call delays its return and throws an exception.
128          * 
129          * @param topic
130          * @param consumerGroup
131          * @param clientId
132          * @throws CambriaApiException
133          */
134         public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
135                 // do nothing if rate is configured 0 or less
136                 if (fMaxEmptyPollsPerMinute <= 0) {
137                         return;
138                 }
139                                 // setup rate info for this tuple
140                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
141                 final double rate = ri.onCall();
142                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
143                 if (rate > fMaxEmptyPollsPerMinute) {
144                         try {
145                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
146                                                 + ".");
147                                 if (fSleepMs > 0) {
148                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
149                                                         + " ms sleep, then responding in error.");
150                                         Thread.sleep(fSleepMs);
151                                         
152                                 } else {
153                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
154                                 }
155                         } catch (InterruptedException e) {
156                                 log.error("Exception "+ e);
157                                 //Thread.currentThread().interrupt();
158                         }
159                         
160                         
161                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
162                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
163                                         "This client is making too many requests. Please use a long poll "
164                                                         + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
165                         
166                         log.info(errRes.toString());
167                         throw new CambriaApiException(errRes);
168                 }
169                 
170                 
171         }
172
173         /**
174          * 
175          * @param topic
176          * @param consumerGroup
177          * @param clientId
178          * @param sentCount
179          */
180         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
181                 // check for good replies
182                 if (sentCount > 0) {
183                         // that was a good send, reset the metric
184                         getRateInfo(topic, consumerGroup, clientId).reset();
185                 }
186         }
187
188         private static class RateInfo {
189                 private final String fLabel;
190                 private final CdmRateTicker fCallRateSinceLastMsgSend;
191                 /**
192                  * constructor initialzes
193                  * 
194                  * @param label
195                  * @param windowLengthMinutes
196                  */
197                 public RateInfo(String label, int windowLengthMinutes) {
198                         fLabel = label;
199                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
200                                         windowLengthMinutes, TimeUnit.MINUTES);
201                 }
202                 
203                 public String getLabel() {
204                         return fLabel;
205                 }
206
207                 /**
208                  * CdmRateTicker is reset
209                  */
210                 public void reset() {
211                         fCallRateSinceLastMsgSend.reset();
212                 }
213
214                 /**
215                  * 
216                  * @return
217                  */
218                 public double onCall() {
219                         fCallRateSinceLastMsgSend.tick();
220                         return fCallRateSinceLastMsgSend.getRate();
221                 }
222         }
223         
224         
225         
226         
227         
228         
229         
230         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
231                 final String key = makeKey(topic, consumerGroup, clientId);
232                 RateInfo ri = fRateInfo.get(key);
233                 if (ri == null) {
234                         ri = new RateInfo(key, fWindowLengthMins);
235                         fRateInfo.put(key, ri);
236                 }
237                 return ri;
238         }
239         
240         
241         
242
243         
244         
245         
246         private String makeKey(String topic, String group, String id) {
247                 return topic + "::" + group + "::" + id;
248         }
249 }