TopicService authorization check refactor 63/90063/4
authorTomek Kaminski <tomasz.kaminski@nokia.com>
Tue, 18 Jun 2019 08:16:40 +0000 (10:16 +0200)
committerTomek Kaminski <tomasz.kaminski@nokia.com>
Wed, 10 Jul 2019 11:48:12 +0000 (13:48 +0200)
Issue-ID: DMAAP-1222
Change-Id: I67dce771e4377b4211622861ea91d79ae90c96c6
Signed-off-by: Tomek Kaminski <tomasz.kaminski@nokia.com>
src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java
src/test/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImplTest.java [new file with mode: 0644]
src/test/java/org/onap/dmaap/mr/cambria/service/impl/JUnitTestSuite.java
src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java [deleted file]

index 626828b..91fca9c 100644 (file)
@@ -1,17 +1,16 @@
-/**
- * 
- */
-/*******************************************************************************
+/*
  *  ============LICENSE_START=======================================================
  *  org.onap.dmaap
  *  ================================================================================
  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
  *  ================================================================================
+ *  Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
+ * =================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
-*  
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
  *  
- *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *  
- *******************************************************************************/
+ */
 package org.onap.dmaap.dmf.mr.service.impl;
 
+import com.att.ajsc.beans.PropertiesMapBean;
 import java.io.IOException;
 
+import java.security.Principal;
+import javax.servlet.http.HttpServletRequest;
+import joptsimple.internal.Strings;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.math.NumberUtils;
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
 import org.json.JSONException;
@@ -68,15 +70,11 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
 @Service
 public class TopicServiceImpl implements TopicService {
 
-       // private static final Logger LOGGER =
-       
+       private static final String TOPIC_CREATE_OP = "create";
        private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
        @Autowired
        private DMaaPErrorMessages errorMessages;
 
-       // @Value("${msgRtr.topicfactory.aaf}")
-       
-
        public DMaaPErrorMessages getErrorMessages() {
                return errorMessages;
        }
@@ -85,6 +83,30 @@ public class TopicServiceImpl implements TopicService {
                this.errorMessages = errorMessages;
        }
 
+
+  String getPropertyFromAJSCbean(String propertyKey) {
+               return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
+       }
+
+       String getPropertyFromAJSCmap(String propertyKey) {
+               return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
+       }
+
+       NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
+               return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+       }
+
+       void respondOk(DMaaPContext context, String msg) {
+               DMaaPResponseBuilder.respondOkWithHtml(context, msg);
+       }
+
+       void respondOk(DMaaPContext context, JSONObject json) throws IOException {
+               DMaaPResponseBuilder.respondOk(context, json);
+       }
+
+       boolean isCadiEnabled() {
+               return Utils.isCadiEnabled();
+       }
        /**
         * @param dmaapContext
         * @throws JSONException
@@ -106,7 +128,7 @@ public class TopicServiceImpl implements TopicService {
                json.put("topics", topicsList);
 
                LOGGER.info("Returning list of all the topics.");
-               DMaaPResponseBuilder.respondOk(dmaapContext, json);
+               respondOk(dmaapContext, json);
 
        }
 
@@ -136,7 +158,7 @@ public class TopicServiceImpl implements TopicService {
                json.put("topics", topicsList);
 
                LOGGER.info("Returning list of all the topics.");
-               DMaaPResponseBuilder.respondOk(dmaapContext, json);
+               respondOk(dmaapContext, json);
 
        }
 
@@ -171,7 +193,7 @@ public class TopicServiceImpl implements TopicService {
                        o.put("writerAcl", aclToJson(t.getWriterAcl()));
 
                LOGGER.info("Returning details of topic " + topicName);
-               DMaaPResponseBuilder.respondOk(dmaapContext, o);
+               respondOk(dmaapContext, o);
 
        }
 
@@ -188,150 +210,93 @@ public class TopicServiceImpl implements TopicService {
         * 
         */
        @Override
-       public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
-                       throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
-               LOGGER.info("Creating topic " + topicBean.getTopicName());
+       public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
+               String topicName = topicBean.getTopicName();
+               LOGGER.info("Creating topic {}",topicName);
+               String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
 
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-               String key = null;
-               String appName = dmaapContext.getRequest().getHeader("AppName");
-               String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
-                               "enforced.topic.name.AAF");
-
-               if (user != null) {
-                       key = user.getKey();
-
-                       if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
+               try {
+                       final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
+                       final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
 
-                               LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
+                       final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
+                               key, partitions, replicas, topicBean.isTransactionEnabled());
 
-                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                               "Failed to create topic: Access Denied.User does not have permission to perform create topic");
+                       LOGGER.info("Topic {} created successfully. Sending response", topicName);
+                       respondOk(dmaapContext, topicToJson(t));
+               } catch (JSONException ex) {
 
-                               LOGGER.info(errRes.toString());
-                               // throw new DMaaPAccessDeniedException(errRes);
+                       LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
+                                       DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
+                       LOGGER.info(errRes.toString());
+                       throw new CambriaApiException(errRes);
 
-                       }
-               }
-               // else if (user==null &&
-               // (null==dmaapContext.getRequest().getHeader("Authorization") && null
-               // == dmaapContext.getRequest().getHeader("cookie")) ) {
-               else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
-                               && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
-                       LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                       "Failed to create topic: Access Denied.User does not have permission to perform create topic");
+               } catch (ConfigDbException ex) {
 
+                       LOGGER.error("Failed to create topic "+ topicName +".  Config DB Exception", ex);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
+                                       DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
                        LOGGER.info(errRes.toString());
-                       // throw new DMaaPAccessDeniedException(errRes);
+                       throw new CambriaApiException(errRes);
+               } catch (Broker1.TopicExistsException ex) {
+                       LOGGER.error( "Failed to create topic "+ topicName +".  Topic already exists.",ex);
                }
+       }
 
-               if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
-                               )) {
-                       // if (user == null &&
-                       // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
-                       // null != dmaapContext.getRequest().getHeader("cookie"))) {
-                       // ACL authentication is not provided so we will use the aaf
-                       // authentication
-                       LOGGER.info("Authorization the topic");
-
-                       String permission = "";
-                       String nameSpace = "";
-                       if (topicBean.getTopicName().indexOf(".") > 1)
-                               nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
-
-                       String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       "msgRtr.topicfactory.aaf");
-
-                       // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
-
-                       permission = mrFactoryVal + nameSpace + "|create";
+       private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
+               String clientId = Strings.EMPTY;
+               if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
+                       LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
+                       String permission = buildPermission(topicName, operation);
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+                       clientId = getAAFclientId(dmaapContext.getRequest());
 
                        if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
-
-                               LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
-
-                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                               "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
-                                                               + permission);
-
-                               LOGGER.info(errRes.toString());
-                               throw new DMaaPAccessDeniedException(errRes);
-
-                       } else {
-                               // if user is null and aaf authentication is ok then key should
-                               // be ""
-                               // key = "";
-                               /**
-                                * Added as part of AAF user it should return username
-                                */
-
-                               key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
-                               LOGGER.info("key ==================== " + key);
-
+                               LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
+                                       operation, topicName, clientId, permission);
+                               throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
+                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
+                                       "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
                        }
+               } else if(operation.equals(TOPIC_CREATE_OP)){
+                       final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
+                       clientId = (user != null) ? user.getKey() : Strings.EMPTY;
                }
+               return clientId;
+       }
 
-               try {
-                       final String topicName = topicBean.getTopicName();
-                       final String desc = topicBean.getTopicDescription();
-                       int partition = topicBean.getPartitionCount();
-                       // int replica = topicBean.getReplicationCount();
-                       String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       "default.partitions");
-                       String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       "default.replicas");
-                       if (partition == 0) {
-                               if(StringUtils.isNotEmpty(defaultPartitions)){
-                                       partition=Integer.parseInt(defaultPartitions);  
-                               }
-                               else{
-                               partition = 1;
-                               }
-                       }
-                       final int partitions = partition;
-
-                       int replica = topicBean.getReplicationCount();
-                       if (replica == 0) {
-                               if(StringUtils.isNotEmpty(defaultReplicas)){
-                                       replica=Integer.parseInt(defaultReplicas);      
-                               }
-                               else{
-                               replica = 1;
-                               }
-                       }
-                       final int replicas = replica;
-                       boolean transactionEnabled = topicBean.isTransactionEnabled();
-
-                       final Broker1 metabroker = getMetaBroker(dmaapContext);
-                       final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
+       private String getAAFclientId(HttpServletRequest request) {
+               Principal principal = request.getUserPrincipal();
+               if (principal !=null) {
+                       return principal.getName();
+               } else {
+                       LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
+                       return null;
+               }
+       }
 
-                       LOGGER.info("Topic created successfully. Sending response");
-                       DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
-               } catch (JSONException excp) {
+       private boolean isTopicWithEnforcedAuthorization(String topicName) {
+               String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
+               return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
+       }
 
-                       LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
-                                       DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
-                       LOGGER.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
+       int getValueOrDefault(int value, String defaultProperty) {
+               int returnValue = value;
+               if (returnValue <= 0) {
+                       String defaultValue = getPropertyFromAJSCmap(defaultProperty);
+                       returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
+                       returnValue = (returnValue <= 0) ? 1 : returnValue;
+               }
+               return returnValue;
+       }
 
-               } catch (ConfigDbException excp1) {
+       private String buildPermission(String topicName, String operation) {
+               String nameSpace = (topicName.indexOf('.') > 1) ?
+                       topicName.substring(0, topicName.lastIndexOf('.')) : "";
 
-                       LOGGER.error("Failed to create topic.  Config DB Exception", excp1);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
-                                       DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
-                       LOGGER.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
-               } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) {
-                       // TODO Auto-generated catch block
-                       LOGGER.error( e.getMessage());
-               }
+               String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
+               return mrFactoryValue + nameSpace + "|" + operation;
        }
 
        /**
@@ -347,45 +312,10 @@ public class TopicServiceImpl implements TopicService {
        public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
                        CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
 
-
                LOGGER.info(" Deleting topic " + topicName);
-               /*if (true) { // {
-                       LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
-                                                       + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
-                       LOGGER.info(errRes.toString());
-                       throw new DMaaPAccessDeniedException(errRes);
-               }*/
-
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
-
-               if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
-                       LOGGER.info("Authenticating the user, as ACL authentication is not provided");
-                       // String permission =
-                       
-                       String permission = "";
-                       String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
-                       String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
-                                       "msgRtr.topicfactory.aaf");
-                       
-                       permission = mrFactoryVal + nameSpace + "|destroy";
-                       DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
-                               LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
-                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                               errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
-                                                               + errorMessages.getNotPermitted2());
-                               LOGGER.info(errRes.toString());
-                               throw new DMaaPAccessDeniedException(errRes);
-                       }
-
-               }
-
-               final Broker1 metabroker = getMetaBroker(dmaapContext);
-               final Topic topic = metabroker.getTopic(topicName);
+               authorizeClient(dmaapContext, topicName, "destroy");
 
+               final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
                if (topic == null) {
                        LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
                        throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
@@ -394,7 +324,7 @@ public class TopicServiceImpl implements TopicService {
                // metabroker.deleteTopic(topicName);
 
                LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
-               DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
+               respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
        }
 
        /**
@@ -402,7 +332,7 @@ public class TopicServiceImpl implements TopicService {
         * @param dmaapContext
         * @return
         */
-       private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
+       DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
                return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
        }
 
@@ -429,7 +359,7 @@ public class TopicServiceImpl implements TopicService {
                final NsaAcl acl = topic.getWriterAcl();
 
                LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
-               DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
+               respondOk(dmaapContext, aclToJson(acl));
 
        }
 
@@ -474,7 +404,7 @@ public class TopicServiceImpl implements TopicService {
                final NsaAcl acl = topic.getReaderAcl();
 
                LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
-               DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
+               respondOk(dmaapContext, aclToJson(acl));
 
        }
 
@@ -483,7 +413,7 @@ public class TopicServiceImpl implements TopicService {
         * @param t
         * @return
         */
-       private static JSONObject topicToJson(Topic t) {
+       static JSONObject topicToJson(Topic t) {
                final JSONObject o = new JSONObject();
 
                o.put("name", t.getName());
@@ -507,7 +437,7 @@ public class TopicServiceImpl implements TopicService {
                        throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
 
                LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+               final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
 
                
                //
@@ -545,7 +475,7 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
                                + "]. Sending response.");
-               DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
+               respondOk(dmaapContext, "Write access has been granted to publisher.");
 
        }
 
@@ -566,7 +496,7 @@ public class TopicServiceImpl implements TopicService {
                        DMaaPAccessDeniedException {
 
                LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+               final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
                
                //
                //// String permission =
@@ -601,7 +531,7 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
                                + "]. Sending response.");
-               DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
+               respondOk(dmaapContext, "Write access has been revoked for publisher.");
 
        }
 
@@ -617,7 +547,7 @@ public class TopicServiceImpl implements TopicService {
                        DMaaPAccessDeniedException {
 
                LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+               final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
                
                //
                //// String permission =
@@ -651,7 +581,7 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
                                + "]. Sending response.");
-               DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
+               respondOk(dmaapContext,
                                "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
        }
 
@@ -667,7 +597,7 @@ public class TopicServiceImpl implements TopicService {
                        DMaaPAccessDeniedException {
 
                LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
-               final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
+               final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
                
                //// String permission =
                
@@ -701,7 +631,7 @@ public class TopicServiceImpl implements TopicService {
 
                LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
                                + "]. Sending response.");
-               DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
+               respondOk(dmaapContext,
                                "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
 
        }
diff --git a/src/test/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImplTest.java b/src/test/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImplTest.java
new file mode 100644 (file)
index 0000000..f287e8c
--- /dev/null
@@ -0,0 +1,861 @@
+/*
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dmaap.dmf.mr.service.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.contains;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import com.att.nsa.configs.ConfigDbException;
+import com.att.nsa.security.NsaAcl;
+import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import javax.servlet.ServletOutputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.dmaap.dmf.mr.CambriaApiException;
+import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
+import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
+import org.onap.dmaap.dmf.mr.beans.TopicBean;
+import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
+import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
+import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
+import org.onap.dmaap.dmf.mr.metabroker.Broker1;
+import org.onap.dmaap.dmf.mr.metabroker.Topic;
+import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
+import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
+import sun.security.acl.PrincipalImpl;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TopicServiceImplTest {
+
+
+    private static final String TOPIC_CREATE_PEM = "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:org.onap.dmaap.mr|create";
+    private static final String TOPIC_DELETE_PEM = "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:org.onap.dmaap.mr|destroy";
+    private NsaSimpleApiKey user = new NsaSimpleApiKey("admin", "password");
+    private TopicBean topicBean;
+
+    @Spy
+    private TopicServiceImpl topicService;
+
+    @Mock
+    private DMaaPErrorMessages errorMessages;
+
+    @Mock
+    private DMaaPContext dmaapContext;
+
+    @Mock
+    private ConfigurationReader configReader;
+
+    @Mock
+    private ServletOutputStream oStream;
+
+    @Mock
+    private DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
+
+    @Mock
+    private HttpServletRequest httpServReq;
+
+    @Mock
+    private HttpServletResponse httpServRes;
+
+    @Mock
+    private DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
+
+    @Mock
+    private Topic createdTopic;
+
+    @Mock
+    private NsaAcl nsaAcl;
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void setUp() throws Exception {
+        configureSpyInstance();
+        topicService.setErrorMessages(errorMessages);
+
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+    }
+
+    private void configureSpyInstance() throws Exception {
+        doReturn(user).when(topicService).getDmaapAuthenticatedUser(any(DMaaPContext.class));
+        doReturn(dmaapKafkaMetaBroker).when(topicService).getMetaBroker(any(DMaaPContext.class));
+        doNothing().when(topicService).respondOk(any(DMaaPContext.class),anyString());
+        doNothing().when(topicService).respondOk(any(DMaaPContext.class),any(JSONObject.class));
+        when(topicService.getPropertyFromAJSCbean("enforced.topic.name.AAF"))
+            .thenReturn("org.onap.dmaap.mr");
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf"))
+            .thenReturn("org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
+    }
+
+    private void givenTopicBean(String topicName) {
+        topicBean = new TopicBean();
+        topicBean.setTopicName(topicName);
+    }
+
+
+    @Test
+    public void createTopic_shouldSkipAAFAuthorization_whenCadiIsEnabled_andTopicNameNotEnforced() throws Exception {
+        //given
+        String topicName = "UNAUTHENTICATED.PRH.REGISTRATION";
+        givenTopicBean(topicName);
+
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
+            .thenReturn(createdTopic);
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(),
+            anyBoolean());
+        verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
+        verify(httpServReq, never()).isUserInRole(TOPIC_CREATE_PEM);
+    }
+
+    @Test
+    public void createTopic_shouldSkipAAFAuthorization_whenCADIdisabled() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-2";
+        givenTopicBean(topicName);
+
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
+            .thenReturn(createdTopic);
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(),
+            anyBoolean());
+        verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
+        verify(httpServReq, never()).isUserInRole(TOPIC_CREATE_PEM);
+    }
+
+    @Test
+    public void createTopic_shouldPass_whenCADIisDisabled_andNoUserInDmaapContext() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-3";
+        givenTopicBean(topicName);
+
+        doReturn(null).when(topicService).getDmaapAuthenticatedUser(dmaapContext);
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
+            .thenReturn(createdTopic);
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(),
+            anyBoolean());
+        verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
+    }
+
+    @Test
+    public void createTopic_shouldPassWithAAFauthorization_whenCadiIsEnabled_andTopicNameWithEnforcedPrefix() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-4";
+        givenTopicBean(topicName);
+
+        when(topicService.isCadiEnabled()).thenReturn(true);
+        when(httpServReq.isUserInRole(TOPIC_CREATE_PEM)).thenReturn(true);
+        when(httpServReq.getUserPrincipal()).thenReturn(new PrincipalImpl("user"));
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), eq("user"), anyInt(), anyInt(), anyBoolean()))
+            .thenReturn(createdTopic);
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verify(httpServReq).isUserInRole(TOPIC_CREATE_PEM);
+        verify(dmaapKafkaMetaBroker).createTopic(eq(topicName), anyString(), eq("user"), anyInt(), anyInt(), anyBoolean());
+        verify(topicService).respondOk(eq(dmaapContext), any(JSONObject.class));
+        verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
+    }
+
+    @Test
+    public void createTopic_shouldFailWithAAFauthorization_whenCadiIsEnabled_andTopicNameWithEnforcedPrefix() throws Exception {
+        //given
+        thrown.expect(DMaaPAccessDeniedException.class);
+
+        String topicName = "org.onap.dmaap.mr.topic-5";
+        givenTopicBean(topicName);
+
+        when(topicService.isCadiEnabled()).thenReturn(true);
+        when(httpServReq.isUserInRole(TOPIC_CREATE_PEM)).thenReturn(false);
+        when(httpServReq.getUserPrincipal()).thenReturn(new PrincipalImpl("user"));
+
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), eq("user"), anyInt(), anyInt(), anyBoolean()))
+            .thenReturn(createdTopic);
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verify(httpServReq).isUserInRole(TOPIC_CREATE_PEM);
+        verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
+        verifyZeroInteractions(dmaapKafkaMetaBroker);
+        verifyZeroInteractions(createdTopic);
+    }
+
+    @Test
+    public void createTopic_shouldThrowApiException_whenBrokerThrowsConfigDbException() throws Exception {
+        //given
+        thrown.expect(CambriaApiException.class);
+
+        String topicName = "org.onap.dmaap.mr.topic-6";
+        givenTopicBean(topicName);
+
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
+            .thenThrow(new ConfigDbException("fail"));
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verifyZeroInteractions(createdTopic);
+    }
+
+    @Test
+    public void createTopic_shouldFailGracefully_whenTopicExistsExceptionOccurs() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-7";
+        givenTopicBean(topicName);
+
+        when(dmaapKafkaMetaBroker.createTopic(eq(topicName), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
+            .thenThrow(new Broker1.TopicExistsException("enfTopicNamePlusExtra"));
+
+        //when
+        topicService.createTopic(dmaapContext, topicBean);
+
+        //then
+        verifyZeroInteractions(createdTopic);
+    }
+
+    @Test
+    public void getValueOrDefault_shouldParseDeafultAndReturnIt_whenGivenValueIsZero() {
+        //given
+        int value = 0;
+        String defaultPropertyName = "propertyName";
+        when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("6");
+
+        //when
+        int extracted = topicService.getValueOrDefault(value, defaultPropertyName);
+
+        //then
+        assertEquals(6, extracted);
+    }
+
+    @Test
+    public void getValueOrDefault_shouldReturnGivenValue_whenGreaterThanZero() {
+        //given
+        int value = 3;
+        String defaultPropertyName = "propertyName";
+
+        //when
+        int extracted = topicService.getValueOrDefault(value, defaultPropertyName);
+
+        //then
+        assertEquals(value, extracted);
+        verify(topicService, never()).getPropertyFromAJSCmap(defaultPropertyName);
+    }
+
+    @Test
+    public void getValueOrDefault_shouldParseDeafultAndReturnIt_whenGivenValueIsNegative() {
+        //given
+        int value = -3;
+        String defaultPropertyName = "propertyName";
+        when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("6");
+
+        //when
+        int extracted = topicService.getValueOrDefault(value, defaultPropertyName);
+
+        //then
+        assertEquals(6, extracted);
+    }
+
+    @Test
+    public void getValueOrDefault_shouldReturnOne_whenGivenValueIsZero_andDefaultNotProvided() {
+        //given
+        int value = 0;
+        String defaultPropertyName = "propertyName";
+        when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("");
+
+        //when
+        int extracted = topicService.getValueOrDefault(value, defaultPropertyName);
+
+        //then
+        assertEquals(1, extracted);
+    }
+
+    @Test
+    public void getValueOrDefault_shouldReturnOne_whenGivenValueIsZero_andDefaultNaN() {
+        //given
+        int value = 0;
+        String defaultPropertyName = "propertyName";
+        when(topicService.getPropertyFromAJSCmap(defaultPropertyName)).thenReturn("a");
+
+        //when
+        int extracted = topicService.getValueOrDefault(value, defaultPropertyName);
+
+        //then
+        assertEquals(1, extracted);
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testGetTopics_null_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException {
+
+        Assert.assertNotNull(topicService);
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(null);
+
+        topicService.getTopic(dmaapContext, "topicName");
+    }
+
+    @Test
+    public void testGetTopics_NonNull_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException {
+
+        Assert.assertNotNull(topicService);
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+
+        when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(createdTopic);
+
+        when(createdTopic.getName()).thenReturn("topicName");
+        when(createdTopic.getDescription()).thenReturn("topicDescription");
+        when(createdTopic.getOwners()).thenReturn(new HashSet<>(Arrays.asList("user1,user2".split(","))));
+
+        when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
+        when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
+
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(httpServRes.getOutputStream()).thenReturn(oStream);
+
+        topicService.getTopic(dmaapContext, "topicName");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testGetPublishersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
+        IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
+
+        topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
+
+    }
+
+    @Test
+    public void testGetPublishersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
+        IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
+        when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
+        topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testGetConsumersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
+        IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
+
+        topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
+
+    }
+
+    @Test
+    public void testGetConsumersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
+        IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
+
+        when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
+
+        topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
+    }
+
+    @Test
+    public void testGetPublishersByTopicName() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("topicFactoryAAF");
+
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
+
+        when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
+
+        topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testGetPublishersByTopicNameError() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("topicFactoryAAF");
+
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(httpServReq.getMethod()).thenReturn("HEAD");
+
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
+
+        when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
+
+        topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
+    }
+
+    @Test
+    public void deleteTopic_shouldDeleteTopic_whenUserAuthorizedWithAAF_andTopicExists() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-9";
+        when(topicService.isCadiEnabled()).thenReturn(true);
+        when(httpServReq.isUserInRole(TOPIC_DELETE_PEM)).thenReturn(true);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+
+        //when
+        topicService.deleteTopic(dmaapContext, topicName);
+
+        //then
+        verify(httpServReq).isUserInRole(TOPIC_DELETE_PEM);
+        verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
+        verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
+    }
+
+    @Test
+    public void deleteTopic_shouldSkipAAFauthorization_whenTopicNameNotEnforced() throws Exception {
+        //given
+        String topicName = "UNAUTHENTICATED.PRH.READY";
+        when(topicService.isCadiEnabled()).thenReturn(true);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+
+        //when
+        topicService.deleteTopic(dmaapContext, topicName);
+
+        //then
+        verify(httpServReq, never()).isUserInRole(TOPIC_DELETE_PEM);
+        verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
+    }
+
+    @Test
+    public void deleteTopic_shouldDeleteTopic_whenUserAuthorizedInContext_andTopicExists() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-10";
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+
+        //when
+        topicService.deleteTopic(dmaapContext, topicName);
+
+        //then
+        verify(httpServReq, never()).isUserInRole(TOPIC_DELETE_PEM);
+        verify(topicService).respondOk(eq(dmaapContext), contains(topicName));
+    }
+
+    @Test
+    public void deleteTopic_shouldNotDeleteTopic_whenUserNotAuthorizedByAAF() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-10";
+        thrown.expect(DMaaPAccessDeniedException.class);
+
+        when(topicService.isCadiEnabled()).thenReturn(true);
+        when(httpServReq.isUserInRole(TOPIC_DELETE_PEM)).thenReturn(false);
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(createdTopic);
+
+        //when
+        topicService.deleteTopic(dmaapContext, topicName);
+
+        //then
+        verify(httpServReq).isUserInRole(TOPIC_DELETE_PEM);
+        verify(topicService, never()).respondOk(eq(dmaapContext), anyString());
+        verify(topicService, never()).getDmaapAuthenticatedUser(dmaapContext);
+    }
+
+    @Test
+    public void deleteTopic_shouldNotDeleteTopic_whenTopicDoesNotExist() throws Exception {
+        //given
+        String topicName = "org.onap.dmaap.mr.topic-10";
+        thrown.expect(TopicExistsException.class);
+
+        when(dmaapKafkaMetaBroker.getTopic(topicName)).thenReturn(null);
+
+        //when
+        topicService.deleteTopic(dmaapContext, topicName);
+
+        //then
+        verify(topicService, never()).respondOk(eq(dmaapContext), anyString());
+    }
+
+    @Test
+    public void testPermitConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testPermitConsumerForTopic_nulltopic()
+        throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+    @Test
+    public void testdenyConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testdenyConsumerForTopic_nulltopic()
+        throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+
+    @Test
+    public void testPermitPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testPermitPublisherForTopic_nulltopic()
+        throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
+    }
+
+    @Test
+    public void testDenyPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+        when(dmaapContext.getResponse()).thenReturn(httpServRes);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
+        ;
+    }
+
+    @Test(expected = TopicExistsException.class)
+    public void testDenyPublisherForTopic_nulltopic()
+        throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
+        ;
+    }
+
+    @Test
+    public void testGetAllTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.getAllTopics(dmaapContext);
+    }
+
+    @Test
+    public void testGetTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
+        TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
+
+        Assert.assertNotNull(topicService);
+
+        // PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        //PowerMockito.mockStatic(AJSCPropertiesMap.class);
+        when(topicService.getPropertyFromAJSCmap("msgRtr.topicfactory.aaf")).thenReturn("hello");
+        //PowerMockito.mockStatic(DMaaPResponseBuilder.class);
+        when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
+        when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
+        when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
+        when(dmaapContext.getRequest()).thenReturn(httpServReq);
+
+        when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
+        when(dmaapContext.getConfigReader()).thenReturn(configReader);
+        when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
+        when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
+        TopicBean topicBean = new TopicBean();
+        topicBean.setTopicName("enfTopicNamePlusExtra");
+
+        topicService.getTopics(dmaapContext);
+    }
+
+
+}
index c437fe4..ec4b0e2 100644 (file)
@@ -25,10 +25,11 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 import org.junit.runners.Suite.SuiteClasses;
 import org.apache.log4j.Logger;
+import org.onap.dmaap.dmf.mr.service.impl.TopicServiceImplTest;
 
 @RunWith(Suite.class)
 @SuiteClasses({ UIServiceImplTest.class, AdminServiceImplemTest.class, ApiKeysServiceImplTest.class, 
-       ShowConsumerCacheTest.class,TopicServiceImplTest.class, TransactionServiceImplTest.class, MMServiceImplTest.class, 
+       ShowConsumerCacheTest.class,TopicServiceImplTest.class, TransactionServiceImplTest.class, MMServiceImplTest.class,
        BaseTransactionDbImplTest.class,  MetricsServiceImplTest.class,EventsServiceImplTest.class})
 public class JUnitTestSuite {
        private static final Logger LOGGER = Logger.getLogger(JUnitTestSuite.class);
diff --git a/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java b/src/test/java/org/onap/dmaap/mr/cambria/service/impl/TopicServiceImplTest.java
deleted file mode 100644 (file)
index 7cbdf79..0000000
+++ /dev/null
@@ -1,782 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * ONAP Policy Engine
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
- package org.onap.dmaap.mr.cambria.service.impl;
-
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashSet;
-
-import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import com.att.ajsc.beans.PropertiesMapBean;
-import com.att.ajsc.filemonitor.AJSCPropertiesMap;
-import org.onap.dmaap.dmf.mr.CambriaApiException;
-import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
-import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
-import org.onap.dmaap.dmf.mr.beans.TopicBean;
-import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
-import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
-import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
-import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
-import org.onap.dmaap.dmf.mr.metabroker.Topic;
-import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
-import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticator;
-import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
-import org.onap.dmaap.dmf.mr.service.impl.TopicServiceImpl;
-import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
-import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
-import com.att.nsa.configs.ConfigDbException;
-import com.att.nsa.security.NsaAcl;
-import com.att.nsa.security.NsaApiKey;
-import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-
-//@RunWith(MockitoJUnitRunner.class)
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ PropertiesMapBean.class, DMaaPAuthenticatorImpl.class,AJSCPropertiesMap.class,DMaaPResponseBuilder.class })
-public class TopicServiceImplTest {
-
-       TopicServiceImpl topicService;
-
-       @Mock
-       private DMaaPErrorMessages errorMessages;
-
-       @Mock
-       DMaaPContext dmaapContext;
-
-       @Mock
-       ConfigurationReader configReader;
-
-       @Mock
-       ServletOutputStream oStream;
-
-       @Mock
-       DMaaPAuthenticator<NsaSimpleApiKey> dmaaPAuthenticator;
-
-       @Mock
-       DMaaPAAFAuthenticator dmaapAAFauthenticator;
-       @Mock
-       NsaApiKey user;
-
-       @Mock
-       NsaSimpleApiKey nsaSimpleApiKey;
-
-       @Mock
-       HttpServletRequest httpServReq;
-
-       @Mock
-       HttpServletResponse httpServRes;
-
-       @Mock
-       DMaaPKafkaMetaBroker dmaapKafkaMetaBroker;
-
-       @Mock
-       Topic createdTopic;
-
-       @Mock
-       NsaAcl nsaAcl;
-
-       @Mock
-       JSONObject jsonObj;
-
-       @Mock
-       JSONArray jsonArray;
-
-       @Before
-       public void setUp() {
-               MockitoAnnotations.initMocks(this);
-               topicService = new TopicServiceImpl();
-               topicService.setErrorMessages(errorMessages);
-               NsaSimpleApiKey user = new NsaSimpleApiKey("admin", "password");
-               PowerMockito.mockStatic(DMaaPAuthenticatorImpl.class);
-               PowerMockito.when(DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext)).thenReturn(user);
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testCreateTopicWithEnforcedName()
-                       throws DMaaPAccessDeniedException, CambriaApiException, IOException, TopicExistsException {
-               
-               Assert.assertNotNull(topicService);
-               PowerMockito.mockStatic(PropertiesMapBean.class);
-
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
-                               .thenReturn("enfTopicName");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
-               .thenReturn("1");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
-               .thenReturn("1");
-
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-               topicService.createTopic(dmaapContext, topicBean);
-       }
-
-       @Test
-       public void testCreateTopicWithTopicNameNotEnforced()
-                       throws DMaaPAccessDeniedException, CambriaApiException, ConfigDbException,IOException,TopicExistsException, org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(PropertiesMapBean.class);
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
-                               .thenReturn("enfTopicName");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
-               .thenReturn("1");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
-               .thenReturn("1");
-               
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(nsaSimpleApiKey);
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-
-               when(nsaAcl.isActive()).thenReturn(true);
-               when(nsaAcl.getUsers()).thenReturn(new HashSet<>(Arrays.asList("user1,user2".split(","))));
-
-               when(createdTopic.getName()).thenReturn("topicName");
-               when(createdTopic.getOwner()).thenReturn("Owner");
-               when(createdTopic.getDescription()).thenReturn("Description");
-               when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
-               when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
-
-               when(dmaapKafkaMetaBroker.createTopic(anyString(), anyString(), anyString(), anyInt(), anyInt(), anyBoolean()))
-                               .thenReturn(createdTopic);
-
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("NotEnforcedTopicName");
-
-               topicService.createTopic(dmaapContext, topicBean);
-
-               verify(dmaapKafkaMetaBroker, times(1)).createTopic(anyString(), anyString(), anyString(), anyInt(), anyInt(),
-                               anyBoolean());
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testCreateTopicNoUserInContextAndNoAuthHeader()
-                       throws DMaaPAccessDeniedException, CambriaApiException, IOException, TopicExistsException {
-               
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(PropertiesMapBean.class);
-
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
-                               .thenReturn("enfTopicName");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
-               .thenReturn("1");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
-               .thenReturn("1");
-
-               when(httpServReq.getHeader("Authorization")).thenReturn(null);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-               topicService.createTopic(dmaapContext, topicBean);
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testCreateTopicNoUserInContextAndAuthHeaderAndPermitted()
-                       throws DMaaPAccessDeniedException, CambriaApiException, IOException, TopicExistsException {
-               
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(PropertiesMapBean.class);
-
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "enforced.topic.name.AAF"))
-                               .thenReturn("enfTopicName");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.partitions"))
-               .thenReturn("1");
-               when(PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, "default.replicas"))
-               .thenReturn("1");
-
-               when(httpServReq.getHeader("Authorization")).thenReturn("Authorization");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-
-               // when(dmaapAAFauthenticator.aafAuthentication(httpServReq,
-               // anyString())).thenReturn(false);
-
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-               topicService.createTopic(dmaapContext, topicBean);
-       }
-
-       @Test(expected = TopicExistsException.class)
-       public void testGetTopics_null_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException {
-
-               Assert.assertNotNull(topicService);
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(null);
-
-               topicService.getTopic(dmaapContext, "topicName");
-       }
-
-       @Test
-       public void testGetTopics_NonNull_topic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException {
-
-               Assert.assertNotNull(topicService);
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-
-               when(dmaapKafkaMetaBroker.getTopic(anyString())).thenReturn(createdTopic);
-
-               when(createdTopic.getName()).thenReturn("topicName");
-               when(createdTopic.getDescription()).thenReturn("topicDescription");
-               when(createdTopic.getOwners()).thenReturn(new HashSet<>(Arrays.asList("user1,user2".split(","))));
-
-               when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
-               when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
-
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(httpServRes.getOutputStream()).thenReturn(oStream);
-
-               topicService.getTopic(dmaapContext, "topicName");
-       }
-
-       @Test(expected = TopicExistsException.class)
-       public void testGetPublishersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
-                       IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
-
-               topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
-
-       }
-
-       @Test
-       public void testGetPublishersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
-                       IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
-               when(createdTopic.getWriterAcl()).thenReturn(nsaAcl);
-               topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
-       }
-
-       @Test(expected = TopicExistsException.class)
-       public void testGetConsumersByTopicName_nullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
-                       IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
-
-               topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
-
-       }
-
-       @Test
-       public void testGetConsumersByTopicName_nonNullTopic() throws DMaaPAccessDeniedException, CambriaApiException,
-                       IOException, TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
-
-               when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
-
-               topicService.getConsumersByTopicName(dmaapContext, "topicNamespace.name");
-       }
-
-       @Test
-       public void testGetPublishersByTopicName() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("topicFactoryAAF");
-
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(createdTopic);
-
-               when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
-
-               topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testGetPublishersByTopicNameError() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("topicFactoryAAF");
-
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(httpServReq.getMethod()).thenReturn("HEAD");
-
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.name")).thenReturn(null);
-
-               when(createdTopic.getReaderAcl()).thenReturn(nsaAcl);
-
-               topicService.getPublishersByTopicName(dmaapContext, "topicNamespace.name");
-       }
-
-       @Test
-       public void testdeleteTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.deleteTopic(dmaapContext, "topicNamespace.topic");
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testdeleteTopic_nulltopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.deleteTopic(dmaapContext, "topicNamespace.topic");
-       }
-       
-       /*@Test(expected=DMaaPAccessDeniedException.class)
-       public void testdeleteTopic_authHeader() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-               PowerMockito.when(DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext)).thenReturn(null);
-               topicService.deleteTopic(dmaapContext, "topicNamespace.topic");
-       }*/
-       
-       @Test
-       public void testPermitConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testPermitConsumerForTopic_nulltopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.permitConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       @Test
-       public void testdenyConsumerForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testdenyConsumerForTopic_nulltopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.denyConsumerForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       
-       @Test
-       public void testPermitPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testPermitPublisherForTopic_nulltopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.permitPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");
-       }
-       
-       @Test
-       public void testDenyPublisherForTopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-               when(dmaapContext.getResponse()).thenReturn(httpServRes);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");;
-       }
-       
-       @Test(expected=TopicExistsException.class)
-       public void testDenyPublisherForTopic_nulltopic() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(null);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.denyPublisherForTopic(dmaapContext, "topicNamespace.topic", "admin");;
-       }
-       
-       @Test
-       public void testGetAllTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.getAllTopics(dmaapContext);
-       }
-       
-       @Test
-       public void testGetTopics() throws DMaaPAccessDeniedException, CambriaApiException, IOException,
-                       TopicExistsException, JSONException, ConfigDbException, AccessDeniedException {
-
-               Assert.assertNotNull(topicService);
-
-               // PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.mockStatic(AJSCPropertiesMap.class);
-               PowerMockito.when(AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, "msgRtr.topicfactory.aaf"))
-                               .thenReturn("hello");
-               PowerMockito.mockStatic(DMaaPResponseBuilder.class);
-               when(dmaaPAuthenticator.authenticate(dmaapContext)).thenReturn(null);
-               when(httpServReq.getHeader("AppName")).thenReturn("MyApp");
-               when(httpServReq.getHeader("Authorization")).thenReturn("Admin");
-               when(dmaapContext.getRequest()).thenReturn(httpServReq);
-
-               when(configReader.getfSecurityManager()).thenReturn(dmaaPAuthenticator);
-               when(dmaapContext.getConfigReader()).thenReturn(configReader);
-               when(configReader.getfMetaBroker()).thenReturn(dmaapKafkaMetaBroker);
-               when(dmaapKafkaMetaBroker.getTopic("topicNamespace.topic")).thenReturn(createdTopic);
-               TopicBean topicBean = new TopicBean();
-               topicBean.setTopicName("enfTopicNamePlusExtra");
-
-               topicService.getTopics(dmaapContext);
-       }
-       
-
-
-}