update the testcases after the kafka 11 changes
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / beans / DMaaPCambriaLimiter.java
@@ -8,36 +8,38 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
-package com.att.nsa.cambria.beans;
+package com.att.dmf.mr.beans;
 
+import java.util.Date;
 import java.util.HashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Component;
+
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.utils.Utils;
+
 //import org.slf4j.Logger;
 //import org.slf4j.LoggerFactory;
 
 import com.att.eelf.configuration.EELFLogger;
 import com.att.eelf.configuration.EELFManager;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Component;
-
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
 import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
 import com.att.nsa.drumlin.till.nv.rrNvReadable;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
@@ -47,7 +49,7 @@ import com.att.nsa.metrics.impl.CdmRateTicker;
 /**
  * class provide rate information
  * 
- * @author author
+ * @author anowarul.islam
  *
  */
 @Component
@@ -62,13 +64,19 @@ public class DMaaPCambriaLimiter {
        @Autowired
        public DMaaPCambriaLimiter(@Qualifier("propertyReader") rrNvReadable settings)
                        throws missingReqdSetting, invalidSettingValue {
-               fRateInfo = new HashMap<String, RateInfo>();
+                       fRateInfo = new HashMap<String, RateInfo>();
+               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
                fMaxEmptyPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
                                CambriaConstants.kDefault_MaxEmptyPollsPerMinute);
+               fMaxPollsPerMinute = settings.getDouble(CambriaConstants.kSetting_MaxPollsPerMinute,
+                               30);
                fWindowLengthMins = settings.getInt(CambriaConstants.kSetting_RateLimitWindowLength,
                                CambriaConstants.kDefault_RateLimitWindowLength);
-               fSleepMs = settings.getLong(CambriaConstants.kSetting_MaxEmptyPollsPerMinute,
+               fSleepMs = settings.getLong(CambriaConstants.kSetting_SleepMsOnRateLimit,
                                CambriaConstants.kDefault_SleepMsOnRateLimit);
+               fSleepMs1 = settings.getLong(CambriaConstants.kSetting_SleepMsRealOnRateLimit,
+                               5000);
+               
        }
 
        /**
@@ -90,8 +98,8 @@ public class DMaaPCambriaLimiter {
         *            Pass <= 0 to deactivate rate limiting.
         *            @param windowLengthMins
         */
-       public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins) {
-               this(maxEmptyPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute));
+       public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, double maxPollsPerMinute,int windowLengthMins) {
+               this(maxEmptyPollsPerMinute,maxPollsPerMinute, windowLengthMins, getSleepMsForRate(maxEmptyPollsPerMinute),getSleepMsForRate(1));
        }
 
        /**
@@ -102,11 +110,14 @@ public class DMaaPCambriaLimiter {
         * @param sleepMs
         * @param windowLengthMins
         */
-       public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute, int windowLengthMins, long sleepMs) {
+       public DMaaPCambriaLimiter(double maxEmptyPollsPerMinute,double maxPollsPerMinute, int windowLengthMins, long sleepMs ,long sleepMS1) {
                fRateInfo = new HashMap<String, RateInfo>();
+               fRateInfoCheck = new HashMap<String, RateInfoCheck>();
                fMaxEmptyPollsPerMinute = Math.max(0, maxEmptyPollsPerMinute);
+               fMaxPollsPerMinute = Math.max(0, maxPollsPerMinute);
                fWindowLengthMins = windowLengthMins;
                fSleepMs = Math.max(0, sleepMs);
+               fSleepMs1 = Math.max(0, sleepMS1);
        }
 
        /**
@@ -118,40 +129,72 @@ public class DMaaPCambriaLimiter {
         * @param clientId
         * @throws CambriaApiException
         */
-       public void onCall(String topic, String consumerGroup, String clientId) throws CambriaApiException {
+       public void onCall(String topic, String consumerGroup, String clientId,String remoteHost) throws CambriaApiException {
                // do nothing if rate is configured 0 or less
                if (fMaxEmptyPollsPerMinute <= 0) {
                        return;
                }
-
-               // setup rate info for this tuple
+                               // setup rate info for this tuple
                final RateInfo ri = getRateInfo(topic, consumerGroup, clientId);
-
                final double rate = ri.onCall();
                log.info(ri.getLabel() + ": " + rate + " empty replies/minute.");
-
                if (rate > fMaxEmptyPollsPerMinute) {
                        try {
-                               log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+                               log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxPollsPerMinute
                                                + ".");
                                if (fSleepMs > 0) {
                                        log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
                                                        + " ms sleep, then responding in error.");
                                        Thread.sleep(fSleepMs);
+                                       
                                } else {
                                        log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
                                }
                        } catch (InterruptedException e) {
-                               log.error(e.toString());
-                               Thread.currentThread().interrupt();
+                               // ignore
                        }
+                       
+                       
                        ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
                                        DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
                                        "This client is making too many requests. Please use a long poll "
-                                                       + "setting to decrease the number of requests that result in empty responses. ");
+                                                       + "setting to decrease the number of requests that result in empty responses. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+                       
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
+               /*if (fMaxPollsPerMinute <= 0) {
+                       return;
+               }
+               
+               final RateInfoCheck ric = getRateInfoCheck(topic, consumerGroup, clientId);
+               final double ratevalue = ric.onCall();
+               if (ratevalue > fMaxPollsPerMinute) {
+                       try {
+                               log.warn(ri.getLabel() + ": " + rate + " empty replies/minute, limit is " + fMaxEmptyPollsPerMinute
+                                               + ".");
+                               if (fSleepMs1 > fMaxPollsPerMinute) {
+                               log.warn(ri.getLabel() + ": " + "Slowing response with " + fSleepMs
+                                                       + " ms sleep, then responding in error.");
+                                       Thread.sleep(fSleepMs1);
+                                       ric.reset();
+                               } else {
+                                       log.info(ri.getLabel() + ": " + "No sleep configured, just throwing error.");
+                               }
+                       } catch (InterruptedException e) {
+                               // ignore
+                       }
+                       
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatusCodes.k429_tooManyRequests, 
+                                       DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(), 
+                                       "This client is making too many requests "
+                                                       + ",decrease the number of requests. ","",Utils.getFormattedDate(new Date()),topic,"","",consumerGroup+"/"+clientId,remoteHost);
+                       
+                       log.info(errRes.toString());
+                       throw new CambriaApiException(errRes);
+               }*/
+               
        }
 
        /**
@@ -181,7 +224,7 @@ public class DMaaPCambriaLimiter {
                        fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
                                        windowLengthMinutes, TimeUnit.MINUTES);
                }
-
+               
                public String getLabel() {
                        return fLabel;
                }
@@ -205,13 +248,57 @@ public class DMaaPCambriaLimiter {
                private final String fLabel;
                private final CdmRateTicker fCallRateSinceLastMsgSend;
        }
+       
+       
+       
+       private static class RateInfoCheck {
+               /**
+                * constructor initialzes
+                * 
+                * @param label
+                * @param windowLengthMinutes
+                */
+               public RateInfoCheck(String label, int windowLengthMinutes) {
+                       fLabel = label;
+                       fCallRateSinceLastMsgSend = new CdmRateTicker("Call rate since last msg send", 1, TimeUnit.MINUTES,
+                                       windowLengthMinutes, TimeUnit.MINUTES);
+               }
+
+               public String getLabel() {
+                       return fLabel;
+               }
+
+               /**
+                * CdmRateTicker is reset
+                */
+               public void reset() {
+                       fCallRateSinceLastMsgSend.reset();
+               }
+
+               /**
+                * 
+                * @return
+                */
+               public double onCall() {
+                       fCallRateSinceLastMsgSend.tick();
+                       return fCallRateSinceLastMsgSend.getRate();
+               }
 
+               private final String fLabel;
+               private final CdmRateTicker fCallRateSinceLastMsgSend;
+       }
+       
+       
        private final HashMap<String, RateInfo> fRateInfo;
+       private final HashMap<String, RateInfoCheck> fRateInfoCheck;
        private final double fMaxEmptyPollsPerMinute;
+       private final double fMaxPollsPerMinute;
        private final int fWindowLengthMins;
        private final long fSleepMs;
+       private final long fSleepMs1;
        //private static final Logger log = LoggerFactory.getLogger(DMaaPCambriaLimiter.class);
        private static final EELFLogger log = EELFManager.getInstance().getLogger(DMaaPCambriaLimiter.class);
+       
        private RateInfo getRateInfo(String topic, String consumerGroup, String clientId) {
                final String key = makeKey(topic, consumerGroup, clientId);
                RateInfo ri = fRateInfo.get(key);
@@ -221,7 +308,21 @@ public class DMaaPCambriaLimiter {
                }
                return ri;
        }
+       
+       
+       private RateInfoCheck getRateInfoCheck(String topic, String consumerGroup, String clientId) {
+               final String key = makeKey(topic, consumerGroup, clientId);
+               RateInfoCheck ri = fRateInfoCheck.get(key);
+               if (ri == null) {
+                       ri = new RateInfoCheck(key, 1);
+                       fRateInfoCheck.put(key, ri);
+               }
+               return ri;
+       }
 
+       
+       
+       
        private String makeKey(String topic, String group, String id) {
                return topic + "::" + group + "::" + id;
        }