* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
-package com.att.nsa.cambria.beans;
+package com.att.dmf.mr.beans;
+import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.utils.Utils;
+
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
import com.att.nsa.drumlin.till.nv.rrNvReadable;
import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
/**
* class provide rate information
*
- * @author author
+ * @author anowarul.islam
*
*/
@Component
@Autowired
public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
throws missingReqdSetting, invalidSettingValue {
- fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfoCheck = new HashMap<String, RateInfoCheck>();
fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
+ 30);
fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
CambriaConstants.kDefault_RateLimitWindowLength);
- fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+ fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
CambriaConstants.kDefault_SleepMsOnRateLimit);
+ fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
+ 5000);
+
}
/**
* Pass <= 0 to deactivate rate limiting.
* @param windowLengthMins
*/
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
- this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
+ this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
}
/**
* @param sleepMs
* @param windowLengthMins
*/
- public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
+ public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
fRateInfo = new HashMap<String, RateInfo>();
+ fRateInfoCheck = new HashMap<String, RateInfoCheck>();
fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
+ fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
fWindowLengthMins = windowLengthMins;
fSleepMs = Math.max(0, sleepMs);
+ fSleepMs1 = Math.max(0, sleepMS1);
}
/**
* @param clientId
* @throws CambriaApiException
*/
- public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
+ public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
// do nothing if rate is configured 0 or less
if (fMaxEmptyPollsPerMinute <= 0) {
return;
}
-
- // setup rate info for this tuple
+ // setup rate info for this tuple
final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
-
final double rate = ri.onCall();
log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
-
if (rate > fMaxEmptyPollsPerMinute) {
try {
- log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
+ ".");
if (fSleepMs > 0) {
log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ " ms sleep, then responding in error.");
Thread.sleep(fSleepMs);
+
} else {
log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
}
} catch (InterruptedException e) {
- log.error(e.toString());
- Thread.currentThread().interrupt();
+ // ignore
}
+
+
ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
"This client is making too many requests. Please use a long poll "
- + "setting to decrease the number of requests that result in empty responses. ");
+ + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+
log.info(errRes.toString());
throw new CambriaApiException(errRes);
}
+ /*if (fMaxPollsPerMinute <= 0) {
+ return;
+ }
+
+ final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
+ final double ratevalue = ric.onCall();
+ if (ratevalue > fMaxPollsPerMinute) {
+ try {
+ log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+ + ".");
+ if (fSleepMs1 > fMaxPollsPerMinute) {
+ log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+ + " ms sleep, then responding in error.");
+ Thread.sleep(fSleepMs1);
+ ric.reset();
+ } else {
+ log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+
+ ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests,
+ DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+ "This client is making too many requests "
+ + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+
+ log.info(errRes.toString());
+ throw new CambriaApiException(errRes);
+ }*/
+
}
/**
fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
windowLengthMinutes, TimeUnit.MINUTES);
}
-
+
public String getLabel() {
return fLabel;
}
private final String fLabel;
private final CdmRateTicker fCallRateSinceLastMsgSend;
}
+
+
+
+ private static class RateInfoCheck {
+ /**
+ * constructor initialzes
+ *
+ * @param label
+ * @param windowLengthMinutes
+ */
+ public RateInfoCheck(String label, int windowLengthMinutes) {
+ fLabel = label;
+ fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+ windowLengthMinutes, TimeUnit.MINUTES);
+ }
+
+ public String getLabel() {
+ return fLabel;
+ }
+
+ /**
+ * CdmRateTicker is reset
+ */
+ public void reset() {
+ fCallRateSinceLastMsgSend.reset();
+ }
+
+ /**
+ *
+ * @return
+ */
+ public double onCall() {
+ fCallRateSinceLastMsgSend.tick();
+ return fCallRateSinceLastMsgSend.getRate();
+ }
+ private final String fLabel;
+ private final CdmRateTicker fCallRateSinceLastMsgSend;
+ }
+
+
private final HashMap<String, RateInfo> fRateInfo;
+ private final HashMap<String, RateInfoCheck> fRateInfoCheck;
private final double fMaxEmptyPollsPerMinute;
+ private final double fMaxPollsPerMinute;
private final int fWindowLengthMins;
private final long fSleepMs;
+ private final long fSleepMs1;
//private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+
private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
final String key = makeKey(topic, consumerGroup, clientId);
RateInfo ri = fRateInfo.get(key);
}
return ri;
}
+
+
+ private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
+ final String key = makeKey(topic, consumerGroup, clientId);
+ RateInfoCheck ri = fRateInfoCheck.get(key);
+ if (ri == null) {
+ ri = new RateInfoCheck(key, 1);
+ fRateInfoCheck.put(key, ri);
+ }
+ return ri;
+ }
+
+
+
private String makeKey(String topic, String group, String id) {
return topic + "::" + group + "::" + id;
}