X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fdmf%2Fmr%2Fbeans%2FDMaaPCambriaLimiter.java;fp=src%2Fmain%2Fjava%2Fcom%2Fatt%2Fnsa%2Fcambria%2Fbeans%2FDMaaPCambriaLimiter.java;h=5f283676666c341515e1824040c3485bcd5bc132;hb=b32effcaf5684d5e2f338a4537b71a2375c534e5;hp=4e9fc02e57acedb4bc2b6f51ef93c9f44f2c9ccb;hpb=0823cb186012c8e6b7de3d979dfabb9f838da7c2;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java similarity index 63% rename from src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java rename to src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java index 4e9fc02..5f28367 100644 --- a/src/main/java/com/att/nsa/cambria/beans/DMaaPCambriaLimiter.java +++ b/src/main/java/com/att/dmf/mr/beans/DMaaPCambriaLimiter.java @@ -8,36 +8,38 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ -package com.att.nsa.cambria.beans; +package com.att.dmf.mr.beans; +import java.util.Date; import java.util.HashMap; import java.util.concurrent.TimeUnit; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import com.att.dmf.mr.CambriaApiException; +import com.att.dmf.mr.constants.CambriaConstants; +import com.att.dmf.mr.exception.DMaaPResponseCode; +import com.att.dmf.mr.exception.ErrorResponse; +import com.att.dmf.mr.utils.Utils; + //import org.slf4j.Logger; //import org.slf4j.LoggerFactory; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.stereotype.Component; - -import com.att.nsa.cambria.CambriaApiException; -import com.att.nsa.cambria.constants.CambriaConstants; -import com.att.nsa.cambria.exception.DMaaPResponseCode; -import com.att.nsa.cambria.exception.ErrorResponse; import com.att.nsa.drumlin.service.standards.HttpStatusCodes; import com.att.nsa.drumlin.till.nv.rrNvReadable; import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue; @@ -47,7 +49,7 @@ import com.att.nsa.metrics.impl.CdmRateTicker; /** * class provide rate information * - * @author author + * @author anowarul.islam * */ @Component @@ -62,13 +64,19 @@ public class DMaaPCambriaLimiter { @Autowired public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings) throws missingReqdSetting, invalidSettingValue { - fRateInfo = new HashMap(); + fRateInfo = new HashMap(); + fRateInfoCheck = new HashMap(); fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, CambriaConstants.kDefault_MaxEmptyPollsPerMinute); + fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute, + 30); fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength, CambriaConstants.kDefault_RateLimitWindowLength); - fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute, + fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit, CambriaConstants.kDefault_SleepMsOnRateLimit); + fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit, + 5000); + } /** @@ -90,8 +98,8 @@ public class DMaaPCambriaLimiter { * Pass <= 0 to deactivate rate limiting. * @param windowLengthMins */ - public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) { - this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute)); + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) { + this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1)); } /** @@ -102,11 +110,14 @@ public class DMaaPCambriaLimiter { * @param sleepMs * @param windowLengthMins */ - public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) { + public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) { fRateInfo = new HashMap(); + fRateInfoCheck = new HashMap(); fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute); + fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute); fWindowLengthMins = windowLengthMins; fSleepMs = Math.max(0, sleepMs); + fSleepMs1 = Math.max(0, sleepMS1); } /** @@ -118,40 +129,72 @@ public class DMaaPCambriaLimiter { * @param clientId * @throws CambriaApiException */ - public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException { + public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException { // do nothing if rate is configured 0 or less if (fMaxEmptyPollsPerMinute <= 0) { return; } - - // setup rate info for this tuple + // setup rate info for this tuple final RateInfo ri = getRateInfo(topic, consumerGroup, clientId); - final double rate = ri.onCall(); log.info(ri.getLabel() + ": " + rate + " empty replies/minute."); - if (rate > fMaxEmptyPollsPerMinute) { try { - log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute + log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute + "."); if (fSleepMs > 0) { log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs + " ms sleep, then responding in error."); Thread.sleep(fSleepMs); + } else { log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); } } catch (InterruptedException e) { - log.error(e.toString()); - Thread.currentThread().interrupt(); + // ignore } + + ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), "This client is making too many requests. Please use a long poll " - + "setting to decrease the number of requests that result in empty responses. "); + + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); + log.info(errRes.toString()); throw new CambriaApiException(errRes); } + /*if (fMaxPollsPerMinute <= 0) { + return; + } + + final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId); + final double ratevalue = ric.onCall(); + if (ratevalue > fMaxPollsPerMinute) { + try { + log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute + + "."); + if (fSleepMs1 > fMaxPollsPerMinute) { + log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs + + " ms sleep, then responding in error."); + Thread.sleep(fSleepMs1); + ric.reset(); + } else { + log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error."); + } + } catch (InterruptedException e) { + // ignore + } + + + ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, + DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), + "This client is making too many requests " + + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost); + + log.info(errRes.toString()); + throw new CambriaApiException(errRes); + }*/ + } /** @@ -181,7 +224,7 @@ public class DMaaPCambriaLimiter { fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, windowLengthMinutes, TimeUnit.MINUTES); } - + public String getLabel() { return fLabel; } @@ -205,13 +248,57 @@ public class DMaaPCambriaLimiter { private final String fLabel; private final CdmRateTicker fCallRateSinceLastMsgSend; } + + + + private static class RateInfoCheck { + /** + * constructor initialzes + * + * @param label + * @param windowLengthMinutes + */ + public RateInfoCheck(String label, int windowLengthMinutes) { + fLabel = label; + fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES, + windowLengthMinutes, TimeUnit.MINUTES); + } + + public String getLabel() { + return fLabel; + } + + /** + * CdmRateTicker is reset + */ + public void reset() { + fCallRateSinceLastMsgSend.reset(); + } + + /** + * + * @return + */ + public double onCall() { + fCallRateSinceLastMsgSend.tick(); + return fCallRateSinceLastMsgSend.getRate(); + } + private final String fLabel; + private final CdmRateTicker fCallRateSinceLastMsgSend; + } + + private final HashMap fRateInfo; + private final HashMap fRateInfoCheck; private final double fMaxEmptyPollsPerMinute; + private final double fMaxPollsPerMinute; private final int fWindowLengthMins; private final long fSleepMs; + private final long fSleepMs1; //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class); private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class); + private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) { final String key = makeKey(topic, consumerGroup, clientId); RateInfo ri = fRateInfo.get(key); @@ -221,7 +308,21 @@ public class DMaaPCambriaLimiter { } return ri; } + + + private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) { + final String key = makeKey(topic, consumerGroup, clientId); + RateInfoCheck ri = fRateInfoCheck.get(key); + if (ri == null) { + ri = new RateInfoCheck(key, 1); + fRateInfoCheck.put(key, ri); + } + return ri; + } + + + private String makeKey(String topic, String group, String id) { return topic + "::" + group + "::" + id; }