Merge "sonar critical for Conditional Statement"
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / beans / DMaaPCambriaLimiter.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.cambria.beans;
23
24 import java.util.HashMap;
25 import java.util.concurrent.TimeUnit;
26
27 //import org.slf4j.Logger;
28 //import org.slf4j.LoggerFactory;
29
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.beans.factory.annotation.Qualifier;
35 import org.springframework.stereotype.Component;
36
37 import com.att.nsa.cambria.CambriaApiException;
38 import com.att.nsa.cambria.constants.CambriaConstants;
39 import com.att.nsa.cambria.exception.DMaaPResponseCode;
40 import com.att.nsa.cambria.exception.ErrorResponse;
41 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
42 import com.att.nsa.drumlin.till.nv.rrNvReadable;
43 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
44 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
45 import com.att.nsa.metrics.impl.CdmRateTicker;
46
47 /**
48  * class provide rate information
49  * 
50  * @author author
51  *
52  */
53 @Component
54 public class DMaaPCambriaLimiter {
55         /**
56          * constructor initializes
57          * 
58          * @param settings
59          * @throws missingReqdSetting
60          * @throws invalidSettingValue
61          */
62         @Autowired
63         public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
64                         throws missingReqdSetting, invalidSettingValue {
65                 fRateInfo = new HashMap<String, RateInfo>();
66                 fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
67                                 CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
68                 fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
69                                 CambriaConstants.kDefault_RateLimitWindowLength);
70                 fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
71                                 CambriaConstants.kDefault_SleepMsOnRateLimit);
72         }
73
74         /**
75          * static method provide the sleep time
76          * 
77          * @param ratePerMinute
78          * @return
79          */
80         public static long getSleepMsForRate(double ratePerMinute) {
81                 if (ratePerMinute <= 0.0)
82                         return 0;
83                 return Math.max(1000, Math.round(60 * 1000 / ratePerMinute));
84         }
85
86         /**
87          * Construct a rate limiter.
88          * 
89          * @param maxEmptyPollsPerMinute
90          *            Pass <= 0 to deactivate rate limiting.
91          *            @param windowLengthMins
92          */
93         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
94                 this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
95         }
96
97         /**
98          * Construct a rate limiter
99          * 
100          * @param maxEmptyPollsPerMinute
101          *            Pass <= 0 to deactivate rate limiting.
102          * @param sleepMs
103          * @param windowLengthMins
104          */
105         public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
106                 fRateInfo = new HashMap<String, RateInfo>();
107                 fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
108                 fWindowLengthMins = windowLengthMins;
109                 fSleepMs = Math.max(0, sleepMs);
110         }
111
112         /**
113          * Tell the rate limiter about a call to a topic/group/id. If the rate is
114          * too high, this call delays its return and throws an exception.
115          * 
116          * @param topic
117          * @param consumerGroup
118          * @param clientId
119          * @throws CambriaApiException
120          */
121         public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
122                 // do nothing if rate is configured 0 or less
123                 if (fMaxEmptyPollsPerMinute <= 0) {
124                         return;
125                 }
126
127                 // setup rate info for this tuple
128                 final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
129
130                 final double rate = ri.onCall();
131                 log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
132
133                 if (rate > fMaxEmptyPollsPerMinute) {
134                         try {
135                                 log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
136                                                 + ".");
137                                 if (fSleepMs > 0) {
138                                         log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
139                                                         + " ms sleep, then responding in error.");
140                                         Thread.sleep(fSleepMs);
141                                 } else {
142                                         log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
143                                 }
144                         } catch (InterruptedException e) {
145                                 log.error(e.toString());
146                                 Thread.currentThread().interrupt();
147                         }
148                         ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
149                                         DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
150                                         "This client is making too many requests. Please use a long poll "
151                                                         + "setting to decrease the number of requests that result in empty responses. ");
152                         log.info(errRes.toString());
153                         throw new CambriaApiException(errRes);
154                 }
155         }
156
157         /**
158          * 
159          * @param topic
160          * @param consumerGroup
161          * @param clientId
162          * @param sentCount
163          */
164         public void onSend(String topic, String consumerGroup, String clientId, long sentCount) {
165                 // check for good replies
166                 if (sentCount > 0) {
167                         // that was a good send, reset the metric
168                         getRateInfo(topic, consumerGroup, clientId).reset();
169                 }
170         }
171
172         private static class RateInfo {
173                 /**
174                  * constructor initialzes
175                  * 
176                  * @param label
177                  * @param windowLengthMinutes
178                  */
179                 public RateInfo(String label, int windowLengthMinutes) {
180                         fLabel = label;
181                         fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
182                                         windowLengthMinutes, TimeUnit.MINUTES);
183                 }
184
185                 public String getLabel() {
186                         return fLabel;
187                 }
188
189                 /**
190                  * CdmRateTicker is reset
191                  */
192                 public void reset() {
193                         fCallRateSinceLastMsgSend.reset();
194                 }
195
196                 /**
197                  * 
198                  * @return
199                  */
200                 public double onCall() {
201                         fCallRateSinceLastMsgSend.tick();
202                         return fCallRateSinceLastMsgSend.getRate();
203                 }
204
205                 private final String fLabel;
206                 private final CdmRateTicker fCallRateSinceLastMsgSend;
207         }
208
209         private final HashMap<String, RateInfo> fRateInfo;
210         private final double fMaxEmptyPollsPerMinute;
211         private final int fWindowLengthMins;
212         private final long fSleepMs;
213         //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
214         private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
215         private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
216                 final String key = makeKey(topic, consumerGroup, clientId);
217                 RateInfo ri = fRateInfo.get(key);
218                 if (ri == null) {
219                         ri = new RateInfo(key, fWindowLengthMins);
220                         fRateInfo.put(key, ri);
221                 }
222                 return ri;
223         }
224
225         private String makeKey(String topic, String group, String id) {
226                 return topic + "::" + group + "::" + id;
227         }
228 }