EventsService authorization refactor 54/91454/3
authorTomek Kaminski <tomasz.kaminski@nokia.com>
Wed, 10 Jul 2019 10:17:35 +0000 (12:17 +0200)
committerTomek Kaminski <tomasz.kaminski@nokia.com>
Tue, 16 Jul 2019 11:41:21 +0000 (13:41 +0200)
-ErrorResponseProvider extracted
-subscriber method handle with refactor
-publisher method handle with partial refactor

Issue-ID: DMAAP-1230
Signed-off-by: Tomek Kaminski <tomasz.kaminski@nokia.com>
Change-Id: I92d53edd0c791f8ce90275776b3031bb0047810e
Signed-off-by: Tomek Kaminski <tomasz.kaminski@nokia.com>
13 files changed:
src/main/java/org/onap/dmaap/dmf/mr/exception/DMaaPErrorMessages.java
src/main/java/org/onap/dmaap/dmf/mr/resources/CambriaOutboundEventStream.java
src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticator.java
src/main/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImpl.java
src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java [new file with mode: 0644]
src/main/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImpl.java
src/main/java/org/onap/dmaap/dmf/mr/utils/Utils.java
src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java [new file with mode: 0644]
src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java [new file with mode: 0644]
src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java [deleted file]
src/test/java/org/onap/dmaap/mr/cambria/security/JUnitTestSuite.java
src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java [deleted file]
src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java

index 7b68b42..95b0577 100644 (file)
@@ -58,10 +58,10 @@ public class DMaaPErrorMessages {
        private String topicsfailure="Failed to retrieve list of all topics.";
        
        //@Value("${not.permitted.access.1}")
-       private String notPermitted1="Access Denied.User does not have permission to perform";
+       private String notPermitted1="Access Denied.User does not have permission to perform ";
        
        //@Value("${not.permitted.access.2}")
-       private String notPermitted2="operation on Topic:";
+       private String notPermitted2=" operation on Topic:";
        
        //@Value("${get.topic.details.failure}")
        private String topicDetailsFail="Failed to retrieve details of topic:";
index 537fc22..2f7f1db 100644 (file)
@@ -548,7 +548,35 @@ public class CambriaOutboundEventStream implements StreamWriter {
        private ArrayList<Consumer> fKafkaConsumerList;
        private boolean istransType = true;
        // private static final Logger log =
-       
+
 
        private static final EELFLogger log = EELFManager.getInstance().getLogger(CambriaOutboundEventStream.class);
+
+       public int getfLimit() {
+               return fLimit;
+       }
+
+       public int getfTimeoutMs() {
+               return fTimeoutMs;
+       }
+
+       public boolean isfPretty() {
+               return fPretty;
+       }
+
+       public boolean isfWithMeta() {
+               return fWithMeta;
+       }
+
+       public boolean isAAFTopic() {
+               return isAAFTopic;
+       }
+
+       public boolean isIstransEnable() {
+               return istransEnable;
+       }
+
+       public boolean isIstransType() {
+               return istransType;
+       }
 }
\ No newline at end of file
index a7f2376..12f0465 100644 (file)
 package org.onap.dmaap.dmf.mr.security;
 
 import javax.servlet.http.HttpServletRequest;
-
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 
-
-
-
 /**
  * 
  * @author sneha.d.desai
index 25644a7..0e6b0f6 100644 (file)
@@ -21,6 +21,7 @@
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.security;
 
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
 import javax.servlet.http.HttpServletRequest;
 
 import org.onap.dmaap.dmf.mr.CambriaApiException;
@@ -34,47 +35,37 @@ import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
  */
 public class DMaaPAAFAuthenticatorImpl implements DMaaPAAFAuthenticator {
 
+       private static final String NAMESPACE_PROPERTY = "defaultNSforUEB";
+  private static final String DEFAULT_NAMESPACE = "org.onap.dmaap.mr";
+  private static final String NAMESPACE_PREFIX = "org.onap";
+
        /**
         * @param req
         * @param role
         */
        @Override
        public boolean aafAuthentication(HttpServletRequest req, String role) {
-               boolean auth = false;
-               if(req.isUserInRole(role))
-               {
-                       
-                       auth = true;
-               }
-               
-               return auth;
+               return req.isUserInRole(role);
        }
 
        @Override
        public String aafPermissionString(String topicName, String action) throws CambriaApiException {
-               
-               
-               String permission = "";
-               String nameSpace ="";
-               if(topicName.contains(".") && topicName.contains("org.onap")) {
-                       
-                       nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
-               }
-               else {
-                       nameSpace = null;
-                        nameSpace= com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"defaultNSforUEB");
-                       
-                       if(null==nameSpace)nameSpace="org.onap.dmaap.mr";
-                       
-                       
-                       
-               }
-               
-               permission = nameSpace+".topic|:topic."+topicName+"|"+action;
-               return permission;
-               
+
+               String nameSpace = topicName.startsWith(NAMESPACE_PREFIX) ? parseNamespace(topicName) :
+                       readNamespaceFromProperties();
+
+               nameSpace = !nameSpace.isEmpty()? nameSpace : DEFAULT_NAMESPACE;
+
+               return new StringBuilder(nameSpace).append(".topic|:topic.").append(topicName)
+                       .append("|").append(action).toString();
+       }
+
+       String readNamespaceFromProperties() {
+               return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,NAMESPACE_PROPERTY);
+       }
+
+       private String parseNamespace(String topicName) {
+               return topicName.substring(0,topicName.lastIndexOf('.'));
        }
-       
-       
 
 }
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/ErrorResponseProvider.java
new file mode 100644 (file)
index 0000000..50a611e
--- /dev/null
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ *  ============LICENSE_START===================================================
+ *  org.onap.dmaap
+ *  ============================================================================
+ *  Copyright © 2019 Nokia Intellectual Property. All rights reserved.
+ *  ============================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=====================================================
+ ******************************************************************************/
+package org.onap.dmaap.dmf.mr.service.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.Date;
+import org.apache.http.HttpStatus;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
+import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
+import org.onap.dmaap.dmf.mr.utils.Utils;
+
+class ErrorResponseProvider {
+
+    private String clientId;
+    private String topicName;
+    private String consumerGroup;
+    private String remoteHost;
+    private String publisherId;
+    private String publisherIp;
+    private DMaaPErrorMessages errorMessages;
+
+    private ErrorResponseProvider() {
+
+    }
+
+    ErrorResponse getIpBlacklistedError(String remoteAddr) {
+        return new ErrorResponse(HttpStatus.SC_FORBIDDEN,
+            DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+            "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
+            null, Utils.getFormattedDate(new Date()), topicName, publisherId,
+            publisherIp, consumerGroup + "/" + clientId, remoteHost);
+    }
+
+    ErrorResponse getTopicNotFoundError() {
+        return new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+            DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
+            errorMessages.getTopicNotExist() + "-[" + topicName + "]", null, Utils.getFormattedDate(new Date()),
+            topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+    }
+
+    ErrorResponse getAafAuthorizationError(String permission, String action) {
+        return new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+            DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+            errorMessages.getNotPermitted1() + action + errorMessages.getNotPermitted2() + topicName + " on "
+                + permission,
+            null, Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId,
+            remoteHost);
+    }
+
+    ErrorResponse getServiceUnavailableError(String msg) {
+        return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+            DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+            errorMessages.getServerUnav() + msg, null, Utils.getFormattedDate(new Date()), topicName,
+            publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+    }
+
+    ErrorResponse getConcurrentModificationError() {
+        return new ErrorResponse(HttpStatus.SC_CONFLICT,
+            DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
+            "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
+            Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+    }
+
+    ErrorResponse getGenericError(String msg) {
+        return new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
+            DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
+            "Couldn't respond to client, closing cambria consumer" + msg, null,
+            Utils.getFormattedDate(new Date()), topicName, publisherId, publisherIp, consumerGroup + "/" + clientId, remoteHost);
+    }
+
+    public static class Builder {
+
+        private String clientId;
+        private String topicName;
+        private String consumerGroup;
+        private String remoteHost;
+        private String publisherId;
+        private String publisherIp;
+        DMaaPErrorMessages errorMessages;
+
+        Builder withErrorMessages(DMaaPErrorMessages errorMessages) {
+            this.errorMessages = errorMessages;
+            return this;
+        }
+
+        Builder withTopic(String topic) {
+            this.topicName = topic;
+            return this;
+        }
+
+        Builder withClient(String client) {
+            this.clientId = client;
+            return this;
+        }
+
+        Builder withConsumerGroup(String consumerGroup) {
+            this.consumerGroup = consumerGroup;
+            return this;
+        }
+
+        Builder withRemoteHost(String remoteHost) {
+            this.remoteHost = remoteHost;
+            return this;
+        }
+
+        Builder withPublisherId(String publisherId) {
+            this.publisherId = publisherId;
+            return this;
+        }
+
+        Builder withPublisherIp(String publisherIp) {
+            this.publisherIp = publisherIp;
+            return this;
+        }
+
+        public ErrorResponseProvider build() {
+            Preconditions.checkArgument(errorMessages!=null);
+            ErrorResponseProvider errRespProvider = new ErrorResponseProvider();
+            errRespProvider.errorMessages = this.errorMessages;
+            errRespProvider.clientId = this.clientId;
+            errRespProvider.consumerGroup = this.consumerGroup;
+            errRespProvider.topicName = this.topicName;
+            errRespProvider.remoteHost = this.remoteHost;
+            errRespProvider.publisherId = this.publisherId;
+            errRespProvider.publisherIp = this.publisherIp;
+            return errRespProvider;
+        }
+    }
+}
index 11c544f..ec5bfc0 100644 (file)
  *******************************************************************************/
 package org.onap.dmaap.dmf.mr.service.impl;
 
+import com.att.ajsc.filemonitor.AJSCPropertiesMap;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.drumlin.service.standards.MimeTypes;
+import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
+import com.att.nsa.security.NsaApiKey;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.util.rrConvertor;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.ConcurrentModificationException;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Properties;
-
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.MediaType;
-
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.http.HttpStatus;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.json.JSONObject;
 import org.json.JSONTokener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.stereotype.Service;
-
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
 import org.onap.dmaap.dmf.mr.CambriaApiException;
 import org.onap.dmaap.dmf.mr.backends.Consumer;
 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
@@ -56,7 +54,6 @@ import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
 import org.onap.dmaap.dmf.mr.backends.MetricsSet;
 import org.onap.dmaap.dmf.mr.backends.Publisher;
 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
-import org.onap.dmaap.dmf.mr.backends.kafka.KafkaLiveLockAvoider2;
 import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
 import org.onap.dmaap.dmf.mr.beans.LogDetails;
@@ -65,7 +62,6 @@ import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
-
 import org.onap.dmaap.dmf.mr.metabroker.Topic;
 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet;
 import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
@@ -74,15 +70,10 @@ import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
 import org.onap.dmaap.dmf.mr.service.EventsService;
 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
+import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder.StreamWriter;
 import org.onap.dmaap.dmf.mr.utils.Utils;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.util.rrConvertor;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
 
 /**
  * This class provides the functinality to publish and subscribe message to
@@ -93,20 +84,20 @@ import com.att.nsa.util.rrConvertor;
  */
 @Service
 public class EventsServiceImpl implements EventsService {
-       // private static final Logger LOG =
        
        private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventsServiceImpl.class);
-
        private static final String BATCH_LENGTH = "event.batch.length";
        private static final String TRANSFER_ENCODING = "Transfer-Encoding";
+       private static final String TIMEOUT_PROPERTY = "timeout";
+       private static final String SUBSCRIBE_ACTION = "sub";
+       private static final String PUBLISH_ACTION = "pub";
+
        @Autowired
        private DMaaPErrorMessages errorMessages;
-       
-       //@Autowired
-       
 
-       // @Value("${metrics.send.cambria.topic}")
-       
+       String getPropertyFromAJSCmap(String propertyKey) {
+               return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
+       }
 
        public DMaaPErrorMessages getErrorMessages() {
                return errorMessages;
@@ -129,222 +120,91 @@ public class EventsServiceImpl implements EventsService {
         */
        @Override
        public void getEvents(DMaaPContext ctx, String topic, String consumerGroup, String clientId)
-                       throws ConfigDbException, TopicExistsException, AccessDeniedException, UnavailableException,
-                       CambriaApiException, IOException, DMaaPAccessDeniedException {
+                       throws ConfigDbException, AccessDeniedException, UnavailableException,
+                       CambriaApiException, IOException {
+
                final long startTime = System.currentTimeMillis();
                final HttpServletRequest req = ctx.getRequest();
-       
-               boolean isAAFTopic = false;
-               // was this host blacklisted?
-               final String remoteAddr = Utils.getRemoteAddress(ctx);
-               if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                       "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
-                                       null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
-                                       ctx.getRequest().getRemoteHost(), null, null);
-                       LOG.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
-               }
-
-               int limit = CambriaConstants.kNoLimit;
-               if (req.getParameter("limit") != null) {
-                       limit = Integer.parseInt(req.getParameter("limit"));
-               }
-
-               int timeoutMs = CambriaConstants.kNoTimeout;
-               String strtimeoutMS = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
-               if (strtimeoutMS != null)
-                       timeoutMs = Integer.parseInt(strtimeoutMS);
-               // int timeoutMs = ctx.getConfigReader().getSettings().getInt("timeout",
-               
-               if (req.getParameter("timeout") != null) {
-                       timeoutMs = Integer.parseInt(req.getParameter("timeout"));
-               }
-
-               // By default no filter is applied if filter is not passed as a
-               // parameter in the request URI
-               String topicFilter = CambriaConstants.kNoFilter;
-               if (null != req.getParameter("filter")) {
-                       topicFilter = req.getParameter("filter");
-               }
-               // pretty to print the messaages in new line
-               String prettyval = "0";
-               String strPretty = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "pretty");
-               if (null != strPretty)
-                       prettyval = strPretty;
-
-               String metaval = "0";
-               String strmeta = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "meta");
-               if (null != strmeta)
-                       metaval = strmeta;
-
-               final boolean pretty = rrConvertor.convertToBooleanBroad(prettyval);
-               // withMeta to print offset along with message
-               final boolean withMeta = rrConvertor.convertToBooleanBroad(metaval);
-
                final LogWrap logger = new LogWrap(topic, consumerGroup, clientId);
-               logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
+               final String remoteHost = req.getRemoteHost();
+               ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
+                       .withTopic(topic).withConsumerGroup(consumerGroup).withClient(clientId).withRemoteHost(remoteHost).build();
 
-               // is this user allowed to read this topic?
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
-               final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+               validateIpBlacklist(errRespProvider, ctx);
 
-               if (metatopic == null) {
-                       // no such topic.
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.RESOURCE_NOT_FOUND.getResponseCode(),
-                                       errorMessages.getTopicNotExist() + "-[" + topic + "]", null, Utils.getFormattedDate(new Date()),
-                                       topic, null, null, consumerGroup + "/" + clientId, ctx.getRequest().getRemoteHost());
-                       LOG.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
-               }
-               String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "metrics.send.cambria.topic");
-               if (null == metricTopicname)
-                       metricTopicname = "msgrtr.apinode.metrics.dmaap";
-               
-               boolean topicNameEnforced = false;
-               String topicNameStd = null;
-               topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
-                               "enforced.topic.name.AAF");
-               if (null != topicNameStd && topic.startsWith(topicNameStd)) {
-                       topicNameEnforced = true;
+               final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+               if (metaTopic == null) {
+                       throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
                }
 
-               if (null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
-                       if (null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))) {
-                               // check permissions
-                               metatopic.checkUserRead(user);
-                       }
-               }
-               // if headers are not provided then user will be null
-               if (topicNameEnforced ||(user == null && null != ctx.getRequest().getHeader("Authorization"))) {
-                       // the topic name will be sent by the client
-                       
-                       DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       String permission = aaf.aafPermissionString(topic, "sub");
-                       if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
-                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                               errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2() + topic + " on "
-                                                               + permission,
-                                               null, Utils.getFormattedDate(new Date()), topic, null, null, consumerGroup + "/" + clientId,
-                                               ctx.getRequest().getRemoteHost());
-                               LOG.info(errRes.toString());
-                               throw new DMaaPAccessDeniedException(errRes);
+               boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, SUBSCRIBE_ACTION);
 
-                       }
-                       isAAFTopic = true;
-               }
                final long elapsedMs1 = System.currentTimeMillis() - startTime;
                logger.info("Time taken in getEvents Authorization " + elapsedMs1 + " ms for " + topic + " " + consumerGroup
                                + " " + clientId);
-               Consumer c = null;
-               
-               String lhostId = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "clusterhostid");
-               if (null == lhostId) {
-                       try {
-                               lhostId = InetAddress.getLocalHost().getCanonicalHostName();
-                       } catch (UnknownHostException e) {
-                               LOG.info("Unknown Host Exception error occured while getting getting hostid");
-                       }
 
-               }
-                CambriaOutboundEventStream coes = null;
+               verifyHostId();
+               final boolean pretty = isPrettyPrintEnabled();
+               final boolean withMeta = isMetaOffsetEnabled();
+               int timeoutMs = getMessageTimeout(req);
+               int limit = getMessageLimit(req);
+               String topicFilter = (null != req.getParameter("filter")) ? req.getParameter("filter") : CambriaConstants.kNoFilter;
+               logger.info("fetch: timeout=" + timeoutMs + ", limit=" + limit + ", filter=" + topicFilter + " from Remote host "+ctx.getRequest().getRemoteHost());
+
+               Consumer consumer = null;
                try {
                        final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
                        final DMaaPCambriaLimiter rl = ctx.getConfigReader().getfRateLimiter();
-                       rl.onCall(topic, consumerGroup, clientId, ctx.getRequest().getRemoteHost());
-                       c = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
-                                       ctx.getRequest().getRemoteHost());
-                       coes = new CambriaOutboundEventStream.Builder(c).timeout(timeoutMs)
+                       rl.onCall(topic, consumerGroup, clientId, remoteHost);
+                       consumer = ctx.getConfigReader().getfConsumerFactory().getConsumerFor(topic, consumerGroup, clientId, timeoutMs,
+                                       remoteHost);
+                       CambriaOutboundEventStream coes = new CambriaOutboundEventStream.Builder(consumer).timeout(timeoutMs)
                                        .limit(limit).filter(topicFilter).pretty(pretty).withMeta(withMeta).build();
                        coes.setDmaapContext(ctx);
-                       coes.setTopic(metatopic);
-                       if (isTransEnabled() || isAAFTopic) {
-                               coes.setTransEnabled(true);
-                       } else {
-                               coes.setTransEnabled(false);
-                       }
+                       coes.setTopic(metaTopic);
+                       coes.setTransEnabled(isTransEnabled() || isAAFTopic);
                        coes.setTopicStyle(isAAFTopic);
                        final long elapsedMs2 = System.currentTimeMillis() - startTime;
                        logger.info("Time taken in getEvents getConsumerFor " + elapsedMs2 + " ms for " + topic + " "
                                        + consumerGroup + " " + clientId);
 
-                       DMaaPResponseBuilder.setNoCacheHeadings(ctx);
-               
-                       DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
+                       respondOkWithStream(ctx, coes);
                        // No IOException thrown during respondOkWithStream, so commit the
                        // new offsets to all the brokers
-                       c.commitOffsets();
+                       consumer.commitOffsets();
                        final int sent = coes.getSentCount();
-
-                        metricsSet.consumeTick(sent);
-                    rl.onSend(topic, consumerGroup, clientId, sent);
+                       metricsSet.consumeTick(sent);
+                       rl.onSend(topic, consumerGroup, clientId, sent);
                        final long elapsedMs = System.currentTimeMillis() - startTime;
-                       logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + c.getOffset() + " for "
+                       logger.info("Sent " + sent + " msgs in " + elapsedMs + " ms; committed to offset " + consumer.getOffset() + " for "
                                        + topic + " " + consumerGroup + " " + clientId + " on to the server "
-                                       + ctx.getRequest().getRemoteHost());
+                                       + remoteHost);
 
                } catch (UnavailableException excp) {
                        logger.warn(excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
-                                       DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
-                                       errorMessages.getServerUnav() + excp.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
-                                       null, null, consumerGroup + "-" + clientId, ctx.getRequest().getRemoteHost());
+                       ErrorResponse errRes = errRespProvider.getServiceUnavailableError(excp.getMessage());
                        LOG.info(errRes.toString());
                        throw new CambriaApiException(errRes);
 
-               }  catch (java.util.ConcurrentModificationException excp1) {
-                       LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+ctx.getRequest().getRemoteHost());
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
-                                       DMaaPResponseCode.TOO_MANY_REQUESTS.getResponseCode(),
-                                       "Couldn't respond to client, possible of consumer requests from more than one server. Please contact MR team if you see this issue occurs continously", null,
-                                       Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+               } catch (ConcurrentModificationException excp1) {
+                       LOG.info(excp1.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId + " from Remote"+remoteHost);
+                       ErrorResponse errRes = errRespProvider.getConcurrentModificationError();
                        logger.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                        
-               } catch (CambriaApiException excp) {
-                       LOG.info(excp.getMessage() + "on " + topic + " " + consumerGroup + " ****** " + clientId);
-                       
-                       throw excp;
-               }
-               catch (Exception excp) {
-                       // System.out.println(excp + "------------------ " + topic+"
-                       // "+consumerGroup+" "+clientId);
-
+               } catch (Exception excp) {
                        logger.info("Couldn't respond to client, closing cambria consumer " + " " + topic + " " + consumerGroup
                                        + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE + " ****** " + excp);
-                                       
-                               ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
-
-                       
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_SERVICE_UNAVAILABLE,
-                                       DMaaPResponseCode.SERVER_UNAVAILABLE.getResponseCode(),
-                                       "Couldn't respond to client, closing cambria consumer" + excp.getMessage(), null,
-                                       Utils.getFormattedDate(new Date()), topic, null, null, clientId, ctx.getRequest().getRemoteHost());
+                       ctx.getConfigReader().getfConsumerFactory().destroyConsumer(topic, consumerGroup, clientId);
+                       ErrorResponse errRes = errRespProvider.getGenericError(excp.getMessage());
                        logger.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                } finally {
-                       coes = null;
-                       // If no cache, close the consumer now that we're done with it.
-                       boolean kSetting_EnableCache = ConsumerFactory.kDefault_IsCacheEnabled;
-                       String strkSetting_EnableCache = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       ConsumerFactory.kSetting_EnableCache);
-                       if (null != strkSetting_EnableCache)
-                               kSetting_EnableCache = Boolean.parseBoolean(strkSetting_EnableCache);
-                       // if
-                       // (!ctx.getConfigReader().getSettings().getBoolean(ConsumerFactory.kSetting_EnableCache,
-                       // ConsumerFactory.kDefault_IsCacheEnabled) && (c != null)) {
-                       if (!kSetting_EnableCache && (c != null)) {
+                       if (consumer != null && !isCacheEnabled()) {
                                try {
-                                       c.close();
+                                       consumer.close();
                                } catch (Exception e) {
-                                       logger.info("***Exception occured in getEvents finaly block while closing the consumer " + " "
+                                       logger.info("***Exception occurred in getEvents finally block while closing the consumer " + " "
                                                        + topic + " " + consumerGroup + " " + clientId + " " + HttpStatus.SC_SERVICE_UNAVAILABLE
                                                        + " " + e);
                                }
@@ -352,117 +212,155 @@ public class EventsServiceImpl implements EventsService {
                }
        }
 
-       /**
-        * @throws missingReqdSetting
-        * 
-        */
-       @Override
-       public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
-                       final String requestTime) throws ConfigDbException, AccessDeniedException, TopicExistsException,
-                       CambriaApiException, IOException, missingReqdSetting, DMaaPAccessDeniedException {
-
-               // is this user allowed to write to this topic?
-               final long startMs = System.currentTimeMillis();
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
-               final Topic metatopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
-               boolean isAAFTopic = false;
-
-               // was this host blacklisted?
+       private void validateIpBlacklist(ErrorResponseProvider errResponseProvider, DMaaPContext ctx) throws CambriaApiException {
                final String remoteAddr = Utils.getRemoteAddress(ctx);
-
                if (ctx.getConfigReader().getfIpBlackList().contains(remoteAddr)) {
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                       "Source address [" + remoteAddr + "] is blacklisted. Please contact the cluster management team.",
-                                       null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
-                                       ctx.getRequest().getRemoteHost(), null, null);
+                       ErrorResponse errRes = errResponseProvider.getIpBlacklistedError(remoteAddr);
                        LOG.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
+       }
 
-               String topicNameStd = null;
+       private boolean authorizeClientWhenNeeded(DMaaPContext ctx, Topic metaTopic, String topicName,
+               ErrorResponseProvider errRespProvider, String action) throws CambriaApiException, AccessDeniedException {
 
-               // topicNameStd=
-               
-               topicNameStd = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
-                               "enforced.topic.name.AAF");
-               String metricTopicname = com.att.ajsc.filemonitor.AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "metrics.send.cambria.topic");
-               if (null == metricTopicname)
-                       metricTopicname = "msgrtr.apinode.metrics.dmaap";
-               boolean topicNameEnforced = false;
-               if (null != topicNameStd && topic.startsWith(topicNameStd)) {
-                       topicNameEnforced = true;
+               boolean isAAFTopic = false;
+               String metricTopicName = getMetricTopicName();
+               if(!metricTopicName.equalsIgnoreCase(topicName)) {
+                       if(isCadiEnabled() && isTopicNameEnforcedAaf(topicName)) {
+                               isAAFTopic = true;
+                               DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+                               String permission = aaf.aafPermissionString(topicName, action);
+                               if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
+                                       ErrorResponse errRes = errRespProvider.getAafAuthorizationError(permission, action);
+                                       LOG.info(errRes.toString());
+                                       throw new DMaaPAccessDeniedException(errRes);
+
+                               }
+                       } else if( null != metaTopic.getOwner() && !metaTopic.getOwner().isEmpty()) {
+                               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(ctx);
+                               if(SUBSCRIBE_ACTION.equals(action)) {
+                                       metaTopic.checkUserRead(user);
+                               } else if(PUBLISH_ACTION.equals(action)) {
+                                       metaTopic.checkUserWrite(user);
+                               }
+                       }
                }
+               return isAAFTopic;
+       }
+
+       boolean isCadiEnabled() {
+               return Utils.isCadiEnabled();
+       }
 
-               // Here check if the user has rights to publish on the topic
-               // ( This will be called when no auth is added or when UEB API Key
-               // Authentication is used)
-               // checkUserWrite(user) method will throw an error when there is no Auth
-               // header added or when the
-               // user has no publish rights
+       void respondOkWithStream(DMaaPContext ctx, StreamWriter coes) throws IOException{
+               DMaaPResponseBuilder.setNoCacheHeadings(ctx);
+               DMaaPResponseBuilder.respondOkWithStream(ctx, MediaType.APPLICATION_JSON, coes);
+       }
 
-               if (null != metatopic && null != metatopic.getOwner() && !("".equals(metatopic.getOwner()))
-                               && null == ctx.getRequest().getHeader("Authorization") && !topic.equalsIgnoreCase(metricTopicname)) {
-                       metatopic.checkUserWrite(user);
-               }
+       private int getMessageLimit(HttpServletRequest request) {
+               return NumberUtils.toInt(request.getParameter("limit"), CambriaConstants.kNoLimit);
+       }
 
-               // if headers are not provided then user will be null
-               if (topicNameEnforced || (user == null && null != ctx.getRequest().getHeader("Authorization")
-                               && !topic.equalsIgnoreCase(metricTopicname))) {
-                       // the topic name will be sent by the client
-               
-                       DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       String permission = aaf.aafPermissionString(topic, "pub");
-                       if (!aaf.aafAuthentication(ctx.getRequest(), permission)) {
-                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                               errorMessages.getNotPermitted1() + " publish " + errorMessages.getNotPermitted2() + topic
-                                                               + " on " + permission,
-                                               null, Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(ctx.getRequest()),
-                                               ctx.getRequest().getRemoteHost(), null, null);
-                               LOG.info(errRes.toString());
-                               throw new DMaaPAccessDeniedException(errRes);
+       private int getMessageTimeout(HttpServletRequest request) {
+               String timeoutMsAsString = getPropertyFromAJSCmap(TIMEOUT_PROPERTY);
+               int defaultTimeoutMs = timeoutMsAsString!=null ? NumberUtils.toInt(timeoutMsAsString, CambriaConstants.kNoTimeout) :
+                       CambriaConstants.kNoTimeout;
+
+               String timeoutProperty = request.getParameter(TIMEOUT_PROPERTY);
+               return timeoutProperty != null ? NumberUtils.toInt(timeoutProperty, defaultTimeoutMs) : defaultTimeoutMs;
+       }
+
+       private boolean isPrettyPrintEnabled() {
+               return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap("pretty"));
+       }
+
+       private boolean isMetaOffsetEnabled() {
+               return rrConvertor.convertToBooleanBroad(getPropertyFromAJSCmap( "meta"));
+       }
+
+       private boolean isTopicNameEnforcedAaf(String topicName) {
+               String topicNameStd = getPropertyFromAJSCmap("enforced.topic.name.AAF");
+               return !topicNameStd.isEmpty() && topicName.startsWith(topicNameStd);
+       }
+
+       private boolean isCacheEnabled() {
+               String cachePropsSetting = getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache);
+               return !cachePropsSetting.isEmpty() ? Boolean.parseBoolean(cachePropsSetting) : ConsumerFactory.kDefault_IsCacheEnabled;
+       }
+
+       private void verifyHostId() {
+               String lhostId = getPropertyFromAJSCmap("clusterhostid");
+               if (lhostId.isEmpty()) {
+                       try {
+                               InetAddress.getLocalHost().getCanonicalHostName();
+                       } catch (UnknownHostException e) {
+                               LOG.warn("Unknown Host Exception error occurred while getting getting hostid", e);
                        }
-                       isAAFTopic = true;
+
                }
+       }
 
-               final HttpServletRequest req = ctx.getRequest();
+       private String getMetricTopicName() {
+               String metricTopicFromProps = getPropertyFromAJSCmap("metrics.send.cambria.topic");
+               return !metricTopicFromProps.isEmpty() ? metricTopicFromProps : "msgrtr.apinode.metrics.dmaap";
+       }
 
-               // check for chunked input
-               boolean chunked = false;
-               if (null != req.getHeader(TRANSFER_ENCODING)) {
-                       chunked = req.getHeader(TRANSFER_ENCODING).contains("chunked");
-               }
-               // get the media type, or set it to a generic value if it wasn't
-               // provided
-               String mediaType = req.getContentType();
-               if (mediaType == null || mediaType.length() == 0) {
-                       mediaType = MimeTypes.kAppGenericBinary;
-               }
+       /**
+        * @throws missingReqdSetting
+        * 
+        */
+       @Override
+       public void pushEvents(DMaaPContext ctx, final String topic, InputStream msg, final String defaultPartition,
+                       final String requestTime) throws ConfigDbException, AccessDeniedException,
+                       CambriaApiException, IOException, missingReqdSetting {
 
-               if (mediaType.contains("charset=UTF-8")) {
-                       mediaType = mediaType.replace("; charset=UTF-8", "").trim();
-               }
+               final long startMs = System.currentTimeMillis();
+               String remoteHost = ctx.getRequest().getRemoteHost();
+               ErrorResponseProvider errRespProvider = new ErrorResponseProvider.Builder().withErrorMessages(errorMessages)
+                       .withTopic(topic).withRemoteHost(remoteHost).withPublisherIp(remoteHost)
+                       .withPublisherId(Utils.getUserApiKey(ctx.getRequest())).build();
 
-               String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "transidUEBtopicreqd");
-               boolean istransidreqd = false;
-               if (null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true")) {
-                       istransidreqd = true;
+               validateIpBlacklist(errRespProvider, ctx);
+
+               final Topic metaTopic = ctx.getConfigReader().getfMetaBroker().getTopic(topic);
+               if (metaTopic == null) {
+                       throw new CambriaApiException(errRespProvider.getTopicNotFoundError());
                }
 
-               if (isAAFTopic || istransidreqd) {
+               final boolean isAAFTopic = authorizeClientWhenNeeded(ctx, metaTopic, topic, errRespProvider, PUBLISH_ACTION);
+
+               final HttpServletRequest req = ctx.getRequest();
+               boolean chunked = isRequestedChunk(req);
+               String mediaType = getMediaType(req);
+               boolean transactionRequired = isTransactionIdRequired();
+
+               if (isAAFTopic || transactionRequired) {
                        pushEventsWithTransaction(ctx, msg, topic, defaultPartition, requestTime, chunked, mediaType);
                } else {
                        pushEvents(ctx, topic, msg, defaultPartition, chunked, mediaType);
                }
+
                final long endMs = System.currentTimeMillis();
                final long totalMs = endMs - startMs;
-
                LOG.info("Overall Response time - Published " + " msgs in " + totalMs + " ms for topic " + topic);
+       }
+
+       private boolean isRequestedChunk(HttpServletRequest request) {
+               return null != request.getHeader(TRANSFER_ENCODING) &&
+                       request.getHeader(TRANSFER_ENCODING).contains("chunked");
+       }
+
+       private String getMediaType(HttpServletRequest request) {
+               String mediaType = request.getContentType();
+               if (mediaType == null || mediaType.length() == 0) {
+                       return MimeTypes.kAppGenericBinary;
+               }
+               return mediaType.replace("; charset=UTF-8", "").trim();
+       }
 
+       private boolean isTransactionIdRequired() {
+               return getPropertyFromAJSCmap("transidUEBtopicreqd").equalsIgnoreCase("true");
        }
 
        /**
@@ -481,7 +379,7 @@ public class EventsServiceImpl implements EventsService {
         */
        private void pushEvents(DMaaPContext ctx, String topic, InputStream msg, String defaultPartition, boolean chunked,
                        String mediaType)
-                       throws ConfigDbException, AccessDeniedException, TopicExistsException, CambriaApiException, IOException {
+                       throws ConfigDbException, AccessDeniedException, CambriaApiException, IOException {
                final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
                // setup the event set
                final CambriaEventSet events = new CambriaEventSet(mediaType, msg, chunked, defaultPartition);
@@ -490,8 +388,8 @@ public class EventsServiceImpl implements EventsService {
                final long startMs = System.currentTimeMillis();
                long count = 0;
                long maxEventBatch = 1024L* 16;
-               String batchlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
-               if (null != batchlen)
+               String batchlen = getPropertyFromAJSCmap( BATCH_LENGTH);
+               if (null != batchlen && !batchlen.isEmpty())
                        maxEventBatch = Long.parseLong(batchlen);
                // long maxEventBatch =
                
@@ -550,7 +448,7 @@ public class EventsServiceImpl implements EventsService {
                        final JSONObject response = new JSONObject();
                        response.put("count", count);
                        response.put("serverTimeMs", totalMs);
-                       DMaaPResponseBuilder.respondOk(ctx, response);
+                       respondOk(ctx, response);
 
                } catch (Exception excp) {
                        int status = HttpStatus.SC_NOT_FOUND;
@@ -590,7 +488,7 @@ public class EventsServiceImpl implements EventsService {
         */
        private void pushEventsWithTransaction(DMaaPContext ctx, InputStream inputStream, final String topic,
                        final String partitionKey, final String requestTime, final boolean chunked, final String mediaType)
-                       throws ConfigDbException, AccessDeniedException, TopicExistsException, IOException, CambriaApiException {
+                       throws ConfigDbException, AccessDeniedException, IOException, CambriaApiException {
 
                final MetricsSet metricsSet = ctx.getConfigReader().getfMetrics();
 
@@ -601,8 +499,8 @@ public class EventsServiceImpl implements EventsService {
                final long startMs = System.currentTimeMillis();
                long count = 0;
                long maxEventBatch = 1024L * 16;
-               String evenlen = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, BATCH_LENGTH);
-               if (null != evenlen)
+               String evenlen = getPropertyFromAJSCmap( BATCH_LENGTH);
+               if (null != evenlen && !evenlen.isEmpty())
                        maxEventBatch = Long.parseLong(evenlen);
                // final long maxEventBatch =
                
@@ -639,9 +537,9 @@ public class EventsServiceImpl implements EventsService {
 
                                responseTransactionId = m.getLogDetails().getTransactionId();
 
-                               JSONObject jsonObject = new JSONObject();
-                               jsonObject.put("msgWrapMR", m.getMessage());
-                               jsonObject.put("transactionId", responseTransactionId);
+                               //JSONObject jsonObject = new JSONObject();
+                               //jsonObject.put("msgWrapMR", m.getMessage());
+                               //jsonObject.put("transactionId", responseTransactionId);
                                // final KeyedMessage<String, String> data = new
                                // KeyedMessage<String, String>(topic, m.getKey(),
                        
@@ -755,8 +653,9 @@ public class EventsServiceImpl implements EventsService {
                        // build a response
                        final JSONObject response = new JSONObject();
                        response.put("count", count);
+                       response.put("transactionId", responseTransactionId);
                        response.put("serverTimeMs", totalMs);
-                       DMaaPResponseBuilder.respondOk(ctx, response);
+                       respondOk(ctx, response);
 
                } catch (Exception excp) {
                        int status = HttpStatus.SC_NOT_FOUND;
@@ -798,6 +697,10 @@ public class EventsServiceImpl implements EventsService {
                msg.setLogDetails(logDetails);
        }
 
+       void respondOk(DMaaPContext ctx, JSONObject response) throws IOException {
+               DMaaPResponseBuilder.respondOk(ctx, response);
+       }
+
        /**
         * 
         * @author anowarul.islam
@@ -837,8 +740,7 @@ public class EventsServiceImpl implements EventsService {
        }
 
        public boolean isTransEnabled() {
-               String istransidUEBtopicreqd = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "transidUEBtopicreqd");
+               String istransidUEBtopicreqd = getPropertyFromAJSCmap("transidUEBtopicreqd");
                boolean istransidreqd = false;
                if ((null != istransidUEBtopicreqd && istransidUEBtopicreqd.equalsIgnoreCase("true"))) {
                        istransidreqd = true;
@@ -863,13 +765,5 @@ public class EventsServiceImpl implements EventsService {
                return logDetails;
        }
 
-       
-        
-       
-        
-       
-        
-       
-
 
 }
\ No newline at end of file
index 3048251..40e6840 100644 (file)
@@ -23,6 +23,7 @@ package org.onap.dmaap.dmf.mr.utils;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.security.Principal;
 import java.text.DecimalFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -46,7 +47,9 @@ public class Utils {
 
        private static final String DATE_FORMAT = "dd-MM-yyyy::hh:mm:ss:SSS";
        public static final String CAMBRIA_AUTH_HEADER = "X-CambriaAuth";
+       private static final String AUTH_HEADER = "Authorization";
        private static final String BATCH_ID_FORMAT = "000000";
+       private static final String X509_ATTR = "javax.servlet.request.X509Certificate";
        private static final EELFLogger log = EELFManager.getInstance().getLogger(Utils.class);
 
        private Utils() {
@@ -75,15 +78,21 @@ public class Utils {
                if (null != auth) {
                        final String[] splittedAuthKey = auth.split(":");
                        return splittedAuthKey[0];
-               }else if (null!=request.getHeader("Authorization")){
+               }else if (null != request.getHeader(AUTH_HEADER) || null != request.getAttribute(X509_ATTR)){
                        /**
                         * AAF implementation enhancement
                         */
-                        String user= request.getUserPrincipal().getName().toString();
-                       return user.substring(0, user.lastIndexOf("@"));
+                       Principal principal = request.getUserPrincipal();
+                       if(principal != null){
+                               String name = principal.getName();
+                               return name.substring(0, name.lastIndexOf('@'));
+                       }
+                       log.warn("No principal has been provided on HTTP request");
                }
                return null;
        }
+
+
        /**
         * to format the batch sequence id
         * @param batchId
diff --git a/src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java b/src/test/java/org/onap/dmaap/dmf/mr/security/DMaaPAAFAuthenticatorImplTest.java
new file mode 100644 (file)
index 0000000..0b8829c
--- /dev/null
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dmaap.dmf.mr.security;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.BDDMockito.given;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.springframework.mock.web.MockHttpServletRequest;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DMaaPAAFAuthenticatorImplTest {
+
+    private MockHttpServletRequest request;
+    @Spy
+    private DMaaPAAFAuthenticatorImpl aafAuthorizer;
+
+    @Before
+    public void setUp() throws Exception {
+        request = new MockHttpServletRequest();
+    }
+
+
+    @Test
+    public void aafAuthentication_shouldSuccess_whenRequestIsConfiguredWithProperUserRole() {
+        //given
+        String userRole = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|sub";
+        request.addUserRole(userRole);
+
+        //when
+        boolean isAuthorized = aafAuthorizer.aafAuthentication(request, userRole);
+
+        //then
+        assertTrue(isAuthorized);
+    }
+
+    @Test
+    public void aafAuthentication_shouldFail_whenRequestIsConfiguredWithProperUserRole() {
+        //given
+        String userRole = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|pub";
+
+        //when
+        boolean isAuthorized = aafAuthorizer.aafAuthentication(request, userRole);
+
+        //then
+        assertFalse(isAuthorized);
+    }
+
+    @Test
+    public void getPermissionAsString_shouldReturnValidTopicPermission_whenTopicWithNamespace() throws Exception {
+        //given
+        String topicPermission = "org.onap.dmaap.mr.topic|:topic.org.onap.dmaap.mr.aSimpleTopic|pub";
+        String topicName = "org.onap.dmaap.mr.aSimpleTopic";
+        String operation = "pub";
+
+        //when
+        String resultPem = aafAuthorizer.aafPermissionString(topicName, operation);
+
+        //then
+        assertEquals(topicPermission, resultPem);
+    }
+
+    @Test
+    public void getPermissionAsString_shouldReturnValidTopicPermission_whenTopicWithoutNamespace() throws Exception {
+        //given
+        String topicPermission = "org.onap.dmaap.mr.topic|:topic.topicName|pub";
+        String topicName = "topicName";
+        String operation = "pub";
+
+        //when
+        String resultPem = aafAuthorizer.aafPermissionString(topicName, operation);
+
+        //then
+        assertEquals(topicPermission, resultPem);
+    }
+
+    @Test
+    public void getPermissionAsString_shouldReturnValidTopicPermission_whenNamespaceReadFromProperty() throws Exception {
+        //given
+        String topicPermission = "com.custom.ns.topic|:topic.topicName|pub";
+        String topicName = "topicName";
+        String operation = "pub";
+        String customNamespace = "com.custom.ns";
+        given(aafAuthorizer.readNamespaceFromProperties()).willReturn(customNamespace);
+
+        //when
+        String resultPem = aafAuthorizer.aafPermissionString(topicName, operation);
+
+        //then
+        assertEquals(topicPermission, resultPem);
+    }
+
+
+}
diff --git a/src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java b/src/test/java/org/onap/dmaap/dmf/mr/service/impl/EventsServiceImplTest.java
new file mode 100644 (file)
index 0000000..4abbe89
--- /dev/null
@@ -0,0 +1,598 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP Policy Engine
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.dmf.mr.service.impl;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.limits.Blacklist;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import javax.servlet.http.HttpServletRequest;
+import joptsimple.internal.Strings;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.dmaap.dmf.mr.CambriaApiException;
+import org.onap.dmaap.dmf.mr.backends.Consumer;
+import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
+import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
+import org.onap.dmaap.dmf.mr.backends.MetricsSet;
+import org.onap.dmaap.dmf.mr.backends.Publisher;
+import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
+import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
+import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.metabroker.Topic;
+import org.onap.dmaap.dmf.mr.resources.CambriaOutboundEventStream;
+import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+import org.springframework.mock.web.MockHttpServletRequest;
+import org.springframework.mock.web.MockHttpServletResponse;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EventsServiceImplTest {
+
+    private InputStream iStream = null;
+    private DMaaPContext dMaapContext = new DMaaPContext();
+    private DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages();
+    @Mock
+    private ConfigurationReader configurationReader;
+    @Mock
+    private Blacklist blacklist;
+    @Mock
+    private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
+    @Mock
+    private NsaSimpleApiKey nsaSimpleApiKey;
+    @Mock
+    private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
+    @Mock
+    private Topic createdTopic;
+    @Mock
+    private ConsumerFactory factory;
+    @Mock
+    private Consumer consumer;
+    @Mock
+    private Publisher publisher;
+    @Mock
+    private DMaaPCambriaLimiter limiter;
+    @Mock
+    private MetricsSet metrics;
+    @Spy
+    private EventsServiceImpl eventsService;
+
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    private MockHttpServletRequest request;
+
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+        String source = "source of my InputStream";
+        iStream = new ByteArrayInputStream(source.getBytes("UTF-8"));
+
+        request = new MockHttpServletRequest();
+        MockHttpServletResponse response = new MockHttpServletResponse();
+        dMaapContext.setRequest(request);
+        dMaapContext.setResponse(response);
+        when(blacklist.contains(anyString())).thenReturn(false);
+        when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
+        when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        dMaapContext.setConfigReader(configurationReader);
+        eventsService.setErrorMessages(pErrorMessages);
+        doReturn("100").when(eventsService).getPropertyFromAJSCmap("timeout");
+    }
+
+    @Test
+    public void getEvents_shouldFailOnAafAuthorization() throws Exception {
+        String topicPrefix = "org.onap.aaf.enforced";
+        String topicName = topicPrefix + ".topicName";
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+        when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
+        when(eventsService.isCadiEnabled()).thenReturn(true);
+
+        thrown.expect(DMaaPAccessDeniedException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
+
+        eventsService.getEvents(dMaapContext, topicName, "CG1", "23");
+    }
+
+    @Test
+    public void getEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
+        String remoteIp = "10.154.17.115";
+        request.setRemoteAddr(remoteIp);
+        when(blacklist.contains(remoteIp)).thenReturn(true);
+        when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
+
+        eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
+    }
+
+    @Test
+    public void getEvents_shouldFail_whenRequestedTopicNotExists() throws Exception {
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
+
+        eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
+    }
+
+    @Test
+    public void getEvents_shouldFail_whenConsumerLockCannotBeAcquired() throws Exception {
+        //given
+        String topicName = "testTopic345";
+        String consumerGroup = "CG5";
+        String clientId = "13";
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(configurationReader.getfRateLimiter()).thenReturn(limiter);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+        when(configurationReader.getfConsumerFactory()).thenReturn(factory);
+        when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
+        doThrow(new UnavailableException("Could not acquire consumer lock")).when(factory)
+            .getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
+
+        //when
+        eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
+
+        //then
+        verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
+
+    }
+
+    @Test
+    public void getEvents_shouldFail_whenBrokerServicesAreUnavailable() throws Exception {
+        String topicName = "testTopic";
+        String consumerGroup = "CG1";
+        String clientId = "23";
+        when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+        when(configurationReader.getfConsumerFactory()).thenReturn(factory);
+
+        givenUserAuthorizedWithAAF(request, topicName, "sub");
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_SERVICE_UNAVAILABLE)));
+
+        //when
+        eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
+
+        //then
+        verify(factory).destroyConsumer(topicName, consumerGroup, clientId);
+    }
+
+    private void givenUserAuthorizedWithAAF(MockHttpServletRequest request, String topicName, String operation) {
+        String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|" + operation;
+        request.addUserRole(permission);
+    }
+
+    @Test
+    public void getEvents_shouldHandleConcurrentModificationError() throws Exception {
+        String testTopic = "testTopic";
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic(testTopic)).thenReturn(createdTopic);
+        when(configurationReader.getfConsumerFactory()).thenReturn(factory);
+        when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred"));
+        givenUserAuthorizedWithAAF(request, testTopic, "sub");
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_CONFLICT)));
+
+        eventsService.getEvents(dMaapContext, "testTopic", "CG1", "23");
+    }
+
+    @Test
+    public void getEvents_shouldNotAuthorizeClient_whenSubscribingToMetricsTopic() throws Exception {
+        //given
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
+        String consumerGroup = "CG5";
+        String clientId = "7";
+        givenConfiguredWithMocks(metricsTopicName);
+        when(factory.getConsumerFor(eq(metricsTopicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
+            .thenReturn(consumer);
+        doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+
+        //when
+        eventsService.getEvents(dMaapContext, metricsTopicName, consumerGroup, clientId);
+
+        //then
+        verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
+        verify(permittedRequest, never()).isUserInRole(anyString());
+    }
+
+    @Test
+    public void getEvents_shouldNotAuthorizeClient_whenTopicNoteEnforcedWithAaf_andTopicHasNoOwnerSet()
+        throws Exception {
+        //given
+        String topicName = "someSimpleTopicName";
+        String consumerGroup = "CG5";
+        String clientId = "7";
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        givenConfiguredWithMocks(topicName);
+        when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
+            .thenReturn(consumer);
+        doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        when(createdTopic.getOwner()).thenReturn(Strings.EMPTY);
+
+        //when
+        eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
+
+        //then
+        verify(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        verify(dmaaPAuthenticator, never()).authenticate(dMaapContext);
+        verify(permittedRequest, never()).isUserInRole(anyString());
+    }
+
+    @Test
+    public void getEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoReadPermissionToTopic()
+        throws Exception {
+        //given
+        String topicName = "someSimpleTopicName";
+        String consumerGroup = "CG5";
+        String clientId = "7";
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        givenConfiguredWithMocks(topicName);
+        when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
+            .thenReturn(consumer);
+        doNothing().when(eventsService).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
+        when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
+        doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserRead(nsaSimpleApiKey);
+
+        thrown.expect(AccessDeniedException.class);
+
+        //when
+        eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
+
+        //then
+        verify(createdTopic).checkUserRead(nsaSimpleApiKey);
+        verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        verify(permittedRequest, never()).isUserInRole(anyString());
+    }
+
+
+    @Test
+    public void getEvents_shouldSuccessfullyRegisterConsumerToEventsStream_withAafAuthorization() throws Exception {
+        //given
+        String topicName = "testTopic";
+        String consumerGroup = "CG2";
+        String clientId = "6";
+        String messageLimit = "10";
+        String timeout = "25";
+        String meta = "yes";
+        String pretty = "on";
+        String cacheEnabled = "false";
+
+        givenConfiguredWithMocks(topicName);
+        givenConfiguredWithProperties(messageLimit, timeout, meta, pretty, cacheEnabled);
+        when(factory.getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString()))
+            .thenReturn(consumer);
+        givenUserAuthorizedWithAAF(request, topicName, "sub");
+
+        //when
+        eventsService.getEvents(dMaapContext, topicName, consumerGroup, clientId);
+
+        //then
+        ArgumentCaptor<CambriaOutboundEventStream> osWriter = ArgumentCaptor.forClass(CambriaOutboundEventStream.class);
+        verifyInvocationOrderForSuccessCase(topicName, consumerGroup, clientId, osWriter);
+        assertEventStreamProperties(osWriter.getValue(), messageLimit, timeout);
+    }
+
+    private void assertEventStreamProperties(CambriaOutboundEventStream stream, String messageLimit, String timeout) {
+        assertEquals(Integer.valueOf(messageLimit).intValue(), stream.getfLimit());
+        assertEquals(Integer.valueOf(timeout).intValue(), stream.getfTimeoutMs());
+        assertTrue(stream.isfWithMeta());
+        assertTrue(stream.isfPretty());
+    }
+
+    private void givenConfiguredWithProperties(String messageLimit, String timeout, String meta, String pretty,
+        String cacheEnabled) {
+        when(eventsService.getPropertyFromAJSCmap("meta")).thenReturn(meta);
+        when(eventsService.getPropertyFromAJSCmap("pretty")).thenReturn(pretty);
+        when(eventsService.getPropertyFromAJSCmap(ConsumerFactory.kSetting_EnableCache)).thenReturn(cacheEnabled);
+        request.addParameter("timeout", timeout);
+        request.addParameter("limit", messageLimit);
+    }
+
+    private void givenConfiguredWithMocks(String topicName) throws Exception {
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(configurationReader.getfRateLimiter()).thenReturn(limiter);
+        when(configurationReader.getfMetrics()).thenReturn(metrics);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+        when(configurationReader.getfConsumerFactory()).thenReturn(factory);
+        when(configurationReader.getfPublisher()).thenReturn(publisher);
+    }
+
+    private void verifyInvocationOrderForSuccessCase(String topicName, String consumerGroup, String clientId,
+        ArgumentCaptor<CambriaOutboundEventStream> osWriter) throws Exception {
+
+        InOrder inOrder = Mockito.inOrder(configurationReader, factory, metrics, limiter, consumer, eventsService);
+        inOrder.verify(configurationReader).getfMetrics();
+        inOrder.verify(configurationReader).getfRateLimiter();
+        inOrder.verify(limiter).onCall(eq(topicName), eq(consumerGroup), eq(clientId), anyString());
+        inOrder.verify(factory).getConsumerFor(eq(topicName), eq(consumerGroup), eq(clientId), anyInt(), anyString());
+        inOrder.verify(eventsService).respondOkWithStream(eq(dMaapContext), osWriter.capture());
+        inOrder.verify(consumer).commitOffsets();
+        inOrder.verify(metrics).consumeTick(anyInt());
+        inOrder.verify(limiter).onSend(eq(topicName), eq(consumerGroup), eq(clientId), anyLong());
+        inOrder.verify(consumer).close();
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void pushEvents_shouldFail_whenRemoteAddressIsBlacklisted() throws Exception {
+        String remoteIp = "10.132.64.112";
+        request.setRemoteAddr(remoteIp);
+        when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
+        when(blacklist.contains(anyString())).thenReturn(true);
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_FORBIDDEN)));
+
+        eventsService.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
+    }
+
+
+    @Test
+    public void pushEvents_shouldFail_whenRequestedTopicDoesNotExist() throws Exception {
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
+
+        eventsService.pushEvents(dMaapContext, "testTopic", iStream, "5", "13:00:00");
+    }
+
+    @Test
+    public void pushEvents_shouldFailDmaapAuthorization_whenTopicOwnerIsSet_andUserHasNoWritePermissionToTopic()
+        throws Exception {
+        //given
+        String topicName = "someSimpleTopicName";
+
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        givenConfiguredWithMocks(topicName);
+        when(createdTopic.getOwner()).thenReturn("SimpleTopicOwner");
+        when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
+        doThrow(new AccessDeniedException("userName")).when(createdTopic).checkUserWrite(nsaSimpleApiKey);
+
+        thrown.expect(AccessDeniedException.class);
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(createdTopic).checkUserWrite(nsaSimpleApiKey);
+        verify(eventsService, never()).respondOkWithStream(eq(dMaapContext), any(CambriaOutboundEventStream.class));
+        verify(permittedRequest, never()).isUserInRole(anyString());
+    }
+
+    @Test
+    public void pushEvents_shouldFailOnAafAuthorization_whenCadiIsEnabled_topicNameEnforced_andUserHasNoPermission()
+        throws Exception {
+        //given
+        String topicPrefix = "org.onap.aaf.enforced";
+        String topicName = topicPrefix + ".topicName";
+        String permission = "org.onap.dmaap.mr.topic|:topic." + topicName + "|pub";
+        HttpServletRequest deniedRequest = mock(HttpServletRequest.class);
+        when(deniedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        when(deniedRequest.isUserInRole(permission)).thenReturn(false);
+        when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+        when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
+        when(eventsService.isCadiEnabled()).thenReturn(true);
+        dMaapContext.setRequest(deniedRequest);
+
+        thrown.expect(DMaaPAccessDeniedException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_UNAUTHORIZED)));
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(deniedRequest).isUserInRole(permission);
+    }
+
+
+    @Test
+    public void pushEvents_shouldPublishMessagesWithoutTransaction() throws Exception {
+        //given
+        String topicName = "topicWithoutTransaction";
+        givenConfiguredWithMocks(topicName);
+        doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
+        verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
+        assertEquals(1, captor.getValue().getLong("count"));
+    }
+
+    @Test
+    public void pushEvents_shouldHandlePublisherError_whenPushWithoutTransaction() throws Exception {
+        //given
+        String topicName = "topicWithoutTransaction";
+        givenConfiguredWithMocks(topicName);
+        doThrow(new IOException()).when(publisher)
+            .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
+    }
+
+
+    @Test
+    public void pushEvents_shouldPublishMessagesWithTransaction() throws Exception {
+        //given
+        String topicPrefix = "org.onap.dmaap.mr";
+        String topicName = topicPrefix + ".topicWithTransaction";
+        givenConfiguredWithMocks(topicName);
+        when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
+        when(eventsService.isCadiEnabled()).thenReturn(true);
+        doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
+
+        request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
+        verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
+        assertEquals(1, captor.getValue().getLong("count"));
+        assertFalse(captor.getValue().getString("transactionId").isEmpty());
+    }
+
+    @Test
+    public void pushEvents_shouldHandlePublisherError_whenPushWithTransaction() throws Exception {
+        //given
+        String topicPrefix = "org.onap.dmaap.mr";
+        String topicName = topicPrefix + ".topicWithTransaction";
+        givenConfiguredWithMocks(topicName);
+        when(eventsService.getPropertyFromAJSCmap("enforced.topic.name.AAF")).thenReturn(topicPrefix);
+        when(eventsService.isCadiEnabled()).thenReturn(true);
+        doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
+        request.addUserRole("org.onap.dmaap.mr.topic|:topic." + topicName + "|pub");
+        doThrow(new IOException()).when(publisher)
+            .sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+
+        thrown.expect(CambriaApiException.class);
+        thrown.expectMessage(containsString(String.valueOf(HttpStatus.SC_NOT_FOUND)));
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        verify(eventsService, never()).respondOk(any(DMaaPContext.class), any(JSONObject.class));
+    }
+
+    @Test
+    public void pushEvents_shouldNotPerformAnyAuthorization_whenPublishToMetricTopic() throws Exception {
+        //given
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        String metricsTopicName = "msgrtr.apinode.metrics.dmaap";
+        givenConfiguredWithMocks(metricsTopicName);
+        doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
+
+        //when
+        eventsService.pushEvents(dMaapContext, metricsTopicName, iStream, "5", "13:00:00");
+
+        //then
+        ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
+        verify(publisher)
+            .sendBatchMessageNew(eq(metricsTopicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
+        verify(permittedRequest, never()).isUserInRole(anyString());
+        verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
+        assertEquals(1, captor.getValue().getLong("count"));
+    }
+
+    @Test
+    public void pushEvents_shouldNotPerformAnyAuthorization_whenTopicHasNoOwner() throws Exception {
+        //given
+        HttpServletRequest permittedRequest = mock(HttpServletRequest.class);
+        when(permittedRequest.getHeaders(anyString())).thenReturn(Collections.<String>emptyEnumeration());
+        dMaapContext.setRequest(permittedRequest);
+        String topicName = "notEnforcedAafTopic";
+        givenConfiguredWithMocks(topicName);
+        doNothing().when(eventsService).respondOk(eq(dMaapContext), any(JSONObject.class));
+        when(createdTopic.getOwner()).thenReturn(null);
+
+        //when
+        eventsService.pushEvents(dMaapContext, topicName, iStream, "5", "13:00:00");
+
+        //then
+        ArgumentCaptor<JSONObject> captor = ArgumentCaptor.forClass(JSONObject.class);
+        verify(publisher).sendBatchMessageNew(eq(topicName), Mockito.<ArrayList<ProducerRecord<String, String>>>any());
+        verify(eventsService).respondOk(eq(dMaapContext), captor.capture());
+        verify(permittedRequest, never()).isUserInRole(anyString());
+        verify(createdTopic, never()).checkUserWrite(any(NsaSimpleApiKey.class));
+        assertEquals(1, captor.getValue().getLong("count"));
+    }
+
+}
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/security/DMaaPAAFAuthenticatorImplTest.java
deleted file mode 100644 (file)
index 7019a2b..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
- package org.onap.dmaap.mr.cambria.security;
-
-import static org.junit.Assert.*;
-
-import javax.servlet.http.HttpServletRequest;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.springframework.mock.web.MockHttpServletRequest;
-
-import org.onap.dmaap.dmf.mr.CambriaApiException;
-import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
-
-
-
-public class DMaaPAAFAuthenticatorImplTest {
-       
-       private MockHttpServletRequest request = null;
-       @Before
-       public void setUp() throws Exception {
-               //creating servlet object
-               request = new MockHttpServletRequest();
-               request.setServerName("www.example.com");
-               request.setRequestURI("/foo");
-               request.setQueryString("param1=value1&param");
-               String url = request.getRequestURL() + "?" + request.getQueryString(); 
-
-               
-       }
-
-       @After
-       public void tearDown() throws Exception {
-       }
-
-       @Test
-       public void testAafAuthentication() {
-               
-               DMaaPAAFAuthenticatorImpl authenticator = new DMaaPAAFAuthenticatorImpl();
-               authenticator.aafAuthentication(request, "admin");
-               assertTrue(true);
-               
-       }
-       
-       
-       
-       /*@Test
-       public void testAafPermissionString() {
-               
-               DMaaPAAFAuthenticatorImpl authenticator = new DMaaPAAFAuthenticatorImpl();
-               try {
-                       authenticator.aafPermissionString("testTopic", "admin");
-               } catch (CambriaApiException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
-               }
-               
-               assertTrue(true);
-               
-       }*/
-       
-
-}
index 60ae849..ea3f051 100644 (file)
@@ -26,9 +26,10 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.apache.log4j.Logger;
+import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImplTest;
 
 @RunWith(Suite.class)
-@SuiteClasses({ DMaaPAAFAuthenticatorImplTest.class,DMaaPAuthenticatorImplTest.class, 
+@SuiteClasses({ DMaaPAAFAuthenticatorImplTest.class,DMaaPAuthenticatorImplTest.class,
 })
 public class JUnitTestSuite {
        private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class);
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/EventsServiceImplTest.java
deleted file mode 100644 (file)
index 1e677d8..0000000
+++ /dev/null
@@ -1,312 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
- package org.onap.dmaap.mr.cambria.service.impl;
-
-import static org.mockito.Mockito.when;
-import static org.mockito.Matchers.anyString;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ConcurrentModificationException;
-import java.util.Map;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.mock.web.MockHttpServletRequest;
-import org.springframework.mock.web.MockHttpServletResponse;
-
-import com.att.ajsc.beans.PropertiesMapBean;
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import org.onap.dmaap.dmf.mr.CambriaApiException;
-import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
-import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
-import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
-import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
-import org.onap.dmaap.dmf.mr.beans.DMaaPCambriaLimiter;
-import org.onap.dmaap.dmf.mr.backends.ConsumerFactory;
-import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
-import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
-import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
-import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
-import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
-import org.onap.dmaap.dmf.mr.service.impl.EventsServiceImpl;
-import org.onap.dmaap.dmf.mr.utils.PropertyReader;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
-import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
-import com.att.nsa.limits.Blacklist;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-import kafka.admin.AdminUtils;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ DMaaPAuthenticatorImpl.class, AJSCPropertiesMap.class })
-public class EventsServiceImplTest {
-
-       private InputStream iStream = null;
-       DMaaPContext dMaapContext = new DMaaPContext();
-       EventsServiceImpl service = new EventsServiceImpl();
-       DMaaPErrorMessages pErrorMessages = new DMaaPErrorMessages();
-       @Mock
-       ConfigurationReader configurationReader;
-       @Mock
-       Blacklist blacklist;
-       @Mock
-       DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
-       @Mock
-       DMaaPAAFAuthenticator dmaapAAFauthenticator;
-       @Mock
-       NsaApiKey user;
-       @Mock
-       NsaSimpleApiKey nsaSimpleApiKey;
-       @Mock
-       DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
-       @Mock
-       Topic createdTopic;
-       @Mock
-       ConsumerFactory factory;
-
-       @Before
-       public void setUp() throws Exception {
-               MockitoAnnotations.initMocks(this);
-               String source = "source of my InputStream";
-               iStream = new ByteArrayInputStream(source.getBytes("UTF-8"));
-
-               MockHttpServletRequest request = new MockHttpServletRequest();
-               MockHttpServletResponse response = new MockHttpServletResponse();
-               dMaapContext.setRequest(request);
-               dMaapContext.setResponse(response);
-               when(blacklist.contains(anyString())).thenReturn(false);
-               when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
-               dMaapContext.setConfigReader(configurationReader);
-
-               service.setErrorMessages(pErrorMessages);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout")).thenReturn("100");
-
-               AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "timeout");
-
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testGetEvents() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException {
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testGetEventsBlackListErr() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException {
-               when(blacklist.contains(anyString())).thenReturn(true);
-               when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
-               dMaapContext.setConfigReader(configurationReader);
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testGetEventsNoTopicError() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException {
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(null);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testGetEventsuserNull() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException {
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               MockHttpServletRequest mockRequest = new MockHttpServletRequest();
-               mockRequest.addHeader("Authorization", "passed");
-               dMaapContext.setRequest(mockRequest);
-               dMaapContext.getRequest().getHeader("Authorization");
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testGetEventsExcp2() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException {
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               when(configurationReader.getfRateLimiter()).thenThrow(new ConcurrentModificationException("Error occurred"));
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testPushEvents() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting,
-                       invalidSettingValue, loadException {
-
-               // AdminUtils.createTopic(configurationReader.getZk(), "testTopic", 10,
-               // 1, new Properties());
-
-               configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader()));
-
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-
-               service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
-
-               service.getEvents(dMaapContext, "testTopic", "CG1", "23");
-
-               /*
-                * String trueValue = "True";
-                * assertTrue(trueValue.equalsIgnoreCase("True"));
-                */
-
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testPushEventsBlackListedIp() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting,
-                       invalidSettingValue, loadException {
-
-               // AdminUtils.createTopic(configurationReader.getZk(), "testTopic", 10,
-               // 1, new Properties());
-               when(blacklist.contains(anyString())).thenReturn(true);
-               when(configurationReader.getfIpBlackList()).thenReturn(blacklist);
-               configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader()));
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-
-               service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
-
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testPushEventsNoUser() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting,
-                       invalidSettingValue, loadException {
-
-               configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader()));
-
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(null);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               MockHttpServletRequest mockRequest = new MockHttpServletRequest();
-               mockRequest.addHeader("Authorization", "passed");
-               mockRequest.addHeader("Authorization", "passed");
-               dMaapContext.setRequest(mockRequest);
-               dMaapContext.getRequest().getHeader("Authorization");
-               service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
-
-       }
-
-       @Test(expected = CambriaApiException.class)
-       public void testPushEventsWtTransaction() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting,
-                       invalidSettingValue, loadException {
-
-               configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader()));
-
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "transidUEBtopicreqd")).thenReturn("true");
-
-               service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
-
-       }
-       
-       @Test(expected = CambriaApiException.class)
-       public void testPushEventsWtTransactionError() throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,
-                       TopicExistsException, AccessDeniedException, UnavailableException, IOException, missingReqdSetting,
-                       invalidSettingValue, loadException {
-
-               configurationReader.setfRateLimiter(new DMaaPCambriaLimiter(new PropertyReader()));
-
-               when(dmaaPAuthenticator.authenticate(dMaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configurationReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(configurationReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("testTopic")).thenReturn(createdTopic);
-               PowerMockito.when(configurationReader.getfConsumerFactory()).thenReturn(factory);
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "transidUEBtopicreqd")).thenReturn("true");
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "event.batch.length")).thenReturn("0");
-               when(configurationReader.getfPublisher()).thenThrow(new ConcurrentModificationException("Error occurred"));
-
-               service.pushEvents(dMaapContext, "testTopic", iStream, "3", "12:00:00");
-
-       }
-       
-       @Test
-       public void testIsTransEnabled1() {
-
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "transidUEBtopicreqd")).thenReturn("true");
-                 assertTrue(service.isTransEnabled());
-
-       }
-       @Test
-       public void testIsTransEnabled2() {
-
-               when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                               "transidUEBtopicreqd")).thenReturn("false");
-                 assertFalse(service.isTransEnabled());
-
-       }
-
-}
index ec4b0e2..7536127 100644 (file)
@@ -25,12 +25,13 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.apache.log4j.Logger;
+import org.onap.dmaap.dmf.mr.service.impl.EventsServiceImplTest;
 import org.onap.dmaap.dmf.mr.service.impl.TopicServiceImplTest;
 
 @RunWith(Suite.class)
 @SuiteClasses({ UIServiceImplTest.class, AdminServiceImplemTest.class, ApiKeysServiceImplTest.class, 
        ShowConsumerCacheTest.class,TopicServiceImplTest.class, TransactionServiceImplTest.class, MMServiceImplTest.class,
-       BaseTransactionDbImplTest.class,  MetricsServiceImplTest.class,EventsServiceImplTest.class})
+       BaseTransactionDbImplTest.class,  MetricsServiceImplTest.class, EventsServiceImplTest.class})
 public class JUnitTestSuite {
        private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class);