add test cases after the kafka 11 upgrade changes
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / service / TopicRestService.java
index d8be745..3540664 100644 (file)
@@ -8,14 +8,14 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
@@ -45,26 +45,26 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.beans.DMaaPContext;
-import com.att.nsa.cambria.beans.TopicBean;
-import com.att.nsa.cambria.constants.CambriaConstants;
-import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
-import com.att.nsa.cambria.exception.DMaaPErrorMessages;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
-import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
-import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
-import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
-import com.att.nsa.cambria.service.TopicService;
-import com.att.nsa.cambria.utils.ConfigurationReader;
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.dmf.mr.beans.TopicBean;
+import com.att.dmf.mr.constants.CambriaConstants;
+import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
+import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
+import com.att.dmf.mr.service.TopicService;
+import com.att.dmf.mr.utils.ConfigurationReader;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
 
 /**
- * This class is a CXF REST service which acts as gateway for MR Topic Service.
- * 
- * @author author
+ * This class is a CXF REST service which acts 
+ * as gateway for MR Topic Service.
+ * @author Ramkumar Sembaiyan
  *
  */
 
@@ -75,8 +75,7 @@ public class TopicRestService {
        /**
         * Logger obj
         */
-       // private static final Logger LOGGER = Logger
-       // .getLogger(TopicRestService.class);
+       //private static final Logger LOGGER = Logger           .getLogger(TopicRestService.class);
        private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
        /**
         * Config Reader
@@ -101,72 +100,76 @@ public class TopicRestService {
         * TopicService obj
         */
        @Autowired
-       private TopicService tService;
-
+       private TopicService topicService;
+       
        /**
         * DMaaPErrorMessages obj
         */
        @Autowired
        private DMaaPErrorMessages errorMessages;
-
-       private DMaaPContext dmaapContext = new DMaaPContext();
-
+       
        /**
         * mrNamespace
         */
-       // @Value("${msgRtr.namespace.aaf}")
-       // private String mrNamespace;
+       //@Value("${msgRtr.namespace.aaf}")
+//     private String mrNamespace;
+       
 
        /**
         * Fetches a list of topics from the current kafka instance and converted
         * into json object.
         * 
         * @return list of the topics in json format
-        * @throws AccessDeniedException
-        * @throws CambriaApiException
+        * @throws AccessDeniedException 
+        * @throws CambriaApiException 
         * @throws IOException
         * @throws JSONException
-        */
+        * */
        @GET
-       // @Produces(MediaType.TEXT_PLAIN)
+       //@Produces(MediaType.TEXT_PLAIN)
        public void getTopics() throws CambriaApiException {
                try {
-
+                                               
                        LOGGER.info("Authenticating the user before fetching the topics");
-                       // String permission = "com.att.dmaap.mr.topic|*|view";
-                       String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
-                                       "msgRtr.namespace.aaf");
-                       String permission = mrNameS + "|" + "*" + "|" + "view";
+                       //String permission = "com.att.dmaap.mr.topic|*|view";
+                       String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+                       String permission =mrNameS+"|"+"*"+"|"+"view";
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       // Check if client is using AAF CADI Basic Authorization
-                       // If yes then check for AAF role authentication else display all
-                       // topics
-                       if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
-                               if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
-                                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                                       errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
+                       //Check if client is using AAF CADI Basic Authorization
+                       //If yes then check for AAF role authentication else display all topics 
+                       if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+                       {
+                               if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+                               {
+                               
+                                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
+                                                       errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
                                        LOGGER.info(errRes.toString());
                                        throw new DMaaPAccessDeniedException(errRes);
-
+                                       
+                               
                                }
-                       }
-
-                       LOGGER.info("Fetching all Topics");
-
-                       tService.getTopics(getDmaapContext());
-
-                       LOGGER.info("Returning List of all Topics");
-
+                       }       
+                       
+                               LOGGER.info("Fetching all Topics");
+                                       //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl();
+                               topicService.getTopics(getDmaapContext());
+       
+                               LOGGER.info("Returning List of all Topics");
+                       
+                       
                } catch (JSONException | ConfigDbException | IOException excp) {
-                       LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
-                                       errorMessages.getTopicsfailure() + excp.getMessage());
+                       LOGGER.error(
+                                       "Failed to retrieve list of all topics: "
+                                                       + excp.getMessage(), excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), 
+                                       errorMessages.getTopicsfailure()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
+                       
 
                }
        }
@@ -176,56 +179,62 @@ public class TopicRestService {
         * into json object.
         * 
         * @return list of the topics in json format
-        * @throws AccessDeniedException
-        * @throws CambriaApiException
+        * @throws AccessDeniedException 
+        * @throws CambriaApiException 
         * @throws IOException
         * @throws JSONException
-        */
+        * */
        @GET
        @Path("/listAll")
-       // @Produces(MediaType.TEXT_PLAIN)
+       //@Produces(MediaType.TEXT_PLAIN)
        public void getAllTopics() throws CambriaApiException {
                try {
-
+                                               
                        LOGGER.info("Authenticating the user before fetching the topics");
-                       // String permission = "com.att.dmaap.mr.topic|*|view";
-                       String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
-                                       "msgRtr.namespace.aaf");
-                       String permission = mrNameS + "|" + "*" + "|" + "view";
+                       //String permission = "com.att.dmaap.mr.topic|*|view";
+                       String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
+                       String permission =mrNameS+"|"+"*"+"|"+"view";
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       // Check if client is using AAF CADI Basic Authorization
-                       // If yes then check for AAF role authentication else display all
-                       // topics
-                       if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
-                               if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
-                                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                                       errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
+                       //Check if client is using AAF CADI Basic Authorization
+                       //If yes then check for AAF role authentication else display all topics 
+                       if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+                       {
+                               if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+                               {
+                               
+                                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
+                                                       errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
                                        LOGGER.info(errRes.toString());
                                        throw new DMaaPAccessDeniedException(errRes);
-
+                                       
+                               
                                }
-                       }
-
-                       LOGGER.info("Fetching all Topics");
-
-                       tService.getAllTopics(getDmaapContext());
-
-                       LOGGER.info("Returning List of all Topics");
-
+                       }       
+                       
+                               LOGGER.info("Fetching all Topics");
+                               
+                               topicService.getAllTopics(getDmaapContext());
+       
+                               LOGGER.info("Returning List of all Topics");
+                       
+                       
                } catch (JSONException | ConfigDbException | IOException excp) {
-                       LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
-                                       errorMessages.getTopicsfailure() + excp.getMessage());
+                       LOGGER.error(
+                                       "Failed to retrieve list of all topics: "
+                                                       + excp.getMessage(), excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), 
+                                       errorMessages.getTopicsfailure()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
+                       
 
                }
        }
 
+       
        /**
         * Returns details of the topic whose name is passed as a parameter
         * 
@@ -233,55 +242,59 @@ public class TopicRestService {
         *            - name of the topic
         * @return details of a topic whose name is mentioned in the request in json
         *         format.
-        * @throws AccessDeniedException
-        * @throws DMaaPAccessDeniedException
+        * @throws AccessDeniedException 
+        * @throws DMaaPAccessDeniedException 
         * @throws IOException
-        */
+        * */
        @GET
        @Path("/{topicName}")
-       // @Produces(MediaType.TEXT_PLAIN)
+       //@Produces(MediaType.TEXT_PLAIN)
        public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
                try {
-
-                       LOGGER.info("Authenticating the user before fetching the details about topic = " + topicName);
+                       
+                       LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
                        DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-
-                       // String permission=
-                       // "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
-
-                       // Check if client is using AAF CADI Basic Authorization
-                       // If yes then check for AAF role authentication else display all
-                       // topics
-                       if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
+                       
+                       //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
+                       
+                       //Check if client is using AAF CADI Basic Authorization
+                       //If yes then check for AAF role authentication else display all topics 
+                       if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
+                       {
                                String permission = aaf.aafPermissionString(topicName, "view");
-                               if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
-
-                                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                                       DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                                                       errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
-                                       LOGGER.info(errRes.toString());
-                                       throw new DMaaPAccessDeniedException(errRes);
-                               }
-                       }
-
-                       LOGGER.info("Fetching Topic: " + topicName);
-
-                       tService.getTopic(getDmaapContext(), topicName);
-
-                       LOGGER.info("Fetched details of topic: " + topicName);
-
+                               if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+                               {
+                                               
+                                               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                                               DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
+                                                               errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
+                                               LOGGER.info(errRes.toString());
+                                               throw new DMaaPAccessDeniedException(errRes);
+                                       }
+                       } 
+                       
+                               LOGGER.info("Fetching Topic: " + topicName);
+                               
+                               topicService.getTopic(getDmaapContext(), topicName);
+
+                               LOGGER.info("Fetched details of topic: " + topicName);
+               
                } catch (ConfigDbException | IOException | TopicExistsException excp) {
-                       LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
-                                       errorMessages.getTopicDetailsFail() + topicName + excp.getMessage());
+                       LOGGER.error("Failed to retrieve details of topic: " + topicName,
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(), 
+                                       errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
+                       
                }
        }
 
+       
+       
        /**
         * This method is still not working. Need to check on post call and how to
         * accept parameters for post call
@@ -290,50 +303,57 @@ public class TopicRestService {
         *            it will have the bean object
         * @throws TopicExistsException
         * @throws CambriaApiException
-        * @throws JSONException
+        * @throws JSONException 
         * @throws IOException
         * @throws AccessDeniedException
         * 
-        */
+        * */
        @POST
        @Path("/create")
        @Consumes({ MediaType.APPLICATION_JSON })
-       // @Produces(MediaType.TEXT_PLAIN)
-       public void createTopic(TopicBean topicBean) throws CambriaApiException{
-               try {
-                       LOGGER.info("Creating Topic." + topicBean.getTopicName());
-
-                       tService.createTopic(getDmaapContext(), topicBean);
+       //@Produces(MediaType.TEXT_PLAIN)
+       public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
+       try {
+                       LOGGER.info("Creating Topic."+topicBean.getTopicName());
+               
+                       topicService.createTopic(getDmaapContext(), topicBean);
 
                        LOGGER.info("Topic created Successfully.");
-               } catch (TopicExistsException ex) {
-
-                       LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + ex.getMessage());
-                       LOGGER.info(errRes.toString());
-                       throw new CambriaApiException(errRes);
-
-               } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+               } 
+       catch (TopicExistsException ex){
+               
+               LOGGER.error("Error while creating a topic: " + ex.getMessage(),
+                               ex);
+               
+               ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, 
+                               DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                               errorMessages.getCreateTopicFail()+ ex.getMessage());
+               LOGGER.info(errRes.toString());
+               throw new CambriaApiException(errRes);
+               
+               
+       
+       
+               }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
-               } catch (CambriaApiException | IOException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+                       
+               }catch (CambriaApiException |  IOException                       excp) {
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
@@ -342,42 +362,45 @@ public class TopicRestService {
         * 
         * @param topicName
         *            topic
-        * @throws CambriaApiException
+        * @throws CambriaApiException 
         * @throws IOException
-        */
+        * */
        @DELETE
        @Path("/{topicName}")
-       // @Produces(MediaType.TEXT_PLAIN)
+       //@Produces(MediaType.TEXT_PLAIN)
        public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
                try {
                        LOGGER.info("Deleting Topic: " + topicName);
 
-                       tService.deleteTopic(getDmaapContext(), topicName);
+                       topicService.deleteTopic(getDmaapContext(), topicName);
 
                        LOGGER.info("Topic [" + topicName + "] deleted successfully.");
-               } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+               } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
-               } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
+                       
+       }catch (IOException | ConfigDbException
+                               | CambriaApiException | TopicExistsException excp) {
                        LOGGER.error("Error while deleting topic: " + topicName, excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getDeleteTopicFail() + topicName + excp.getMessage());
+               
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
        private DMaaPContext getDmaapContext() {
 
+               DMaaPContext dmaapContext = new DMaaPContext();
                dmaapContext.setRequest(request);
                dmaapContext.setResponse(response);
                dmaapContext.setConfigReader(configReader);
@@ -390,48 +413,49 @@ public class TopicRestService {
         * This method will fetch the details of publisher by giving topic name
         * 
         * @param topicName
-        * @throws CambriaApiException
-        * @throws AccessDeniedException
+        * @throws CambriaApiException 
+        * @throws AccessDeniedException 
         */
        @GET
        @Path("/{topicName}/producers")
-       // @Produces(MediaType.TEXT_PLAIN)
-       public void getPublishersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
+       //@Produces(MediaType.TEXT_PLAIN)
+       public void getPublishersByTopicName(
+                       @PathParam("topicName") String topicName) throws CambriaApiException {
                try {
-
-                       // String permission =
-                       // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
-                       // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       // String permission = aaf.aafPermissionString(topicName, "view");
-                       // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
-                       // permission))
-                       // {
-                       LOGGER.info("Fetching list of all the publishers for topic " + topicName);
-
-                       tService.getPublishersByTopicName(getDmaapContext(), topicName);
-
-                       LOGGER.info("Returning list of all the publishers for topic " + topicName);
-                       // }else{
-                       // LOGGER.error("Error while fetching list of publishers for topic
-                       // "+ topicName);
-                       //
-                       // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                       // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                       // errorMessages.getNotPermitted1()+" fetch list of publishers
-                       // "+errorMessages.getNotPermitted2());
-                       // LOGGER.info(errRes);
-                       // throw new DMaaPAccessDeniedException(errRes);
-                       //
-                       // }
-
+                       
+//                     String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
+//                     DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+//                     String permission = aaf.aafPermissionString(topicName, "view");
+//                     if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+//                     {
+                               LOGGER.info("Fetching list of all the publishers for topic "
+                                               + topicName);
+
+                               topicService.getPublishersByTopicName(getDmaapContext(), topicName);
+
+                               LOGGER.info("Returning list of all the publishers for topic "
+                                               + topicName);
+//                     }else{
+//                             LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
+//                             
+//                             ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+//                                             DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
+//                                             errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
+//                             LOGGER.info(errRes);
+//                             throw new DMaaPAccessDeniedException(errRes);
+//                             
+//                     }
+                       
                } catch (IOException | ConfigDbException | TopicExistsException excp) {
-                       LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
-                                       "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
+                       LOGGER.error("Error while fetching list of publishers for topic "
+                                       + topicName, excp);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(), 
+                                       "Error while fetching list of publishers for topic: "
+                                                       + topicName + excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
@@ -440,38 +464,44 @@ public class TopicRestService {
         * 
         * @param topicName
         * @param producerId
-        * @throws CambriaApiException
+        * @throws CambriaApiException 
         */
        @PUT
        @Path("/{topicName}/producers/{producerId}")
-       public void permitPublisherForTopic(@PathParam("topicName") String topicName,
+       public void permitPublisherForTopic(
+                       @PathParam("topicName") String topicName,
                        @PathParam("producerId") String producerId) throws CambriaApiException {
                try {
-                       LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
+                       LOGGER.info("Granting write access to producer [" + producerId
+                                       + "] for topic " + topicName);
 
-                       tService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
+                       topicService.permitPublisherForTopic(getDmaapContext(), topicName,
+                                       producerId);
 
-                       LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
+                       LOGGER.info("Write access has been granted to producer ["
+                                       + producerId + "] for topic " + topicName);
                } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
-               } catch (ConfigDbException | IOException | TopicExistsException excp) {
-                       LOGGER.error("Error while granting write access to producer [" + producerId + "] for topic " + topicName,
-                                       excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
-                                       "Error while granting write access to producer [" + producerId + "] for topic " + topicName
-                                                       + excp.getMessage());
+                       
+         }catch ( ConfigDbException | IOException
+                               | TopicExistsException excp) {
+                       LOGGER.error("Error while granting write access to producer ["
+                                       + producerId + "] for topic " + topicName, excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
+                                       DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(), 
+                                       "Error while granting write access to producer ["
+                                                       + producerId + "] for topic " + topicName + excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
@@ -480,34 +510,39 @@ public class TopicRestService {
         * 
         * @param topicName
         * @param producerId
-        * @throws CambriaApiException
+        * @throws CambriaApiException 
         */
        @DELETE
        @Path("/{topicName}/producers/{producerId}")
        public void denyPublisherForTopic(@PathParam("topicName") String topicName,
                        @PathParam("producerId") String producerId) throws CambriaApiException {
                try {
-                       LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
+                       LOGGER.info("Revoking write access to producer [" + producerId
+                                       + "] for topic " + topicName);
 
-                       tService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
+                       topicService.denyPublisherForTopic(getDmaapContext(), topicName,
+                                       producerId);
 
-                       LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
+                       LOGGER.info("Write access revoked for producer [" + producerId
+                                       + "] for topic " + topicName);
                } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
-               } catch (ConfigDbException | IOException | TopicExistsException excp) {
-                       LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName,
-                                       excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
-                                       "Error while revoking write access to producer [" + producerId + "] for topic " + topicName
-                                                       + excp.getMessage());
+                       
+       }catch ( ConfigDbException | IOException
+                               | TopicExistsException excp) {
+                       LOGGER.error("Error while revoking write access for producer ["
+                                       + producerId + "] for topic " + topicName, excp);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                       DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(), 
+                                       "Error while revoking write access to producer ["
+                                                       + producerId + "] for topic " + topicName + excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
@@ -517,51 +552,55 @@ public class TopicRestService {
         * Get the consumer details by the topic name
         * 
         * @param topicName
-        * @throws AccessDeniedException
-        * @throws CambriaApiException
+        * @throws AccessDeniedException 
+        * @throws CambriaApiException 
         */
        @GET
        @Path("/{topicName}/consumers")
-       // @Produces(MediaType.TEXT_PLAIN)
-       public void getConsumersByTopicName(@PathParam("topicName") String topicName)
-                       throws CambriaApiException {
+       //@Produces(MediaType.TEXT_PLAIN)
+       public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException, 
+       CambriaApiException {
                try {
-
-                       // String permission =
-                       // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
-                       // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
-                       // String permission = aaf.aafPermissionString(topicName, "view");
-                       // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
-                       // permission))
-                       // {
-                       LOGGER.info("Fetching list of all consumers for topic " + topicName);
-
-                       tService.getConsumersByTopicName(getDmaapContext(), topicName);
-
-                       LOGGER.info("Returning list of all consumers for topic " + topicName);
-
-                       // }else{
-                       // LOGGER.error(
-                       // "Error while fetching list of all consumers for topic "
-                       // + topicName);
-                       // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                       // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
-                       // errorMessages.getNotPermitted1()+" fetch list of consumers
-                       // "+errorMessages.getNotPermitted2());
-                       // LOGGER.info(errRes);
-                       // throw new DMaaPAccessDeniedException(errRes);
-                       //
-                       //
-                       // }
-
+                       
+                       
+//                     String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
+//                     DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
+//                     String permission = aaf.aafPermissionString(topicName, "view");
+//                     if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
+//                     {
+                               LOGGER.info("Fetching list of all consumers for topic " + topicName);
+
+                               topicService.getConsumersByTopicName(getDmaapContext(), topicName);
+
+                               LOGGER.info("Returning list of all consumers for topic "
+                                               + topicName);
+                               
+//                     }else{
+//                             LOGGER.error(
+//                                             "Error while fetching list of all consumers for topic "
+//                                                             + topicName);
+//                             ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+//                                             DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
+//                                             errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
+//                             LOGGER.info(errRes);
+//                             throw new DMaaPAccessDeniedException(errRes);
+//                             
+//                             
+//                     }
+                       
+                       
+                       
                } catch (IOException | ConfigDbException | TopicExistsException excp) {
-                       LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
-                                       "Error while fetching list of all consumers for topic: " + topicName + excp.getMessage());
+                       LOGGER.error(
+                                       "Error while fetching list of all consumers for topic "
+                                                       + topicName, excp);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                       DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(), 
+                                       "Error while fetching list of all consumers for topic: "
+                                                       + topicName+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
@@ -570,28 +609,33 @@ public class TopicRestService {
         * 
         * @param topicName
         * @param consumerId
-        * @throws CambriaApiException
+        * @throws CambriaApiException 
         */
        @PUT
        @Path("/{topicName}/consumers/{consumerId}")
-       public void permitConsumerForTopic(@PathParam("topicName") String topicName,
+       public void permitConsumerForTopic(
+                       @PathParam("topicName") String topicName,
                        @PathParam("consumerId") String consumerId) throws CambriaApiException {
                try {
-                       LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
-
-                       tService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
-
-                       LOGGER.info("Read access granted to consumer [" + consumerId + "] for topic " + topicName);
-               } catch (AccessDeniedException | ConfigDbException | IOException | TopicExistsException excp) {
-                       LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName,
-                                       excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
-                                       "Error while granting read access to consumer [" + consumerId + "] for topic " + topicName
-                                                       + excp.getMessage());
+                       LOGGER.info("Granting read access to consumer [" + consumerId
+                                       + "] for topic " + topicName);
+
+                       topicService.permitConsumerForTopic(getDmaapContext(), topicName,
+                                       consumerId);
+
+                       LOGGER.info("Read access granted to consumer [" + consumerId
+                                       + "] for topic " + topicName);
+               } catch (AccessDeniedException | ConfigDbException | IOException
+                               | TopicExistsException excp) {
+                       LOGGER.error("Error while granting read access to consumer ["
+                                       + consumerId + "] for topic " + topicName, excp);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                       DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(), 
+                                       "Error while granting read access to consumer ["
+                                                       + consumerId + "] for topic " + topicName+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-
+                       
                }
        }
 
@@ -600,37 +644,53 @@ public class TopicRestService {
         * 
         * @param topicName
         * @param consumerId
-        * @throws CambriaApiException
+        * @throws CambriaApiException 
         */
        @DELETE
        @Path("/{topicName}/consumers/{consumerId}")
        public void denyConsumerForTopic(@PathParam("topicName") String topicName,
                        @PathParam("consumerId") String consumerId) throws CambriaApiException {
                try {
-                       LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
-
-                       tService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
-
-                       LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
-               } catch (ConfigDbException | IOException | TopicExistsException excp) {
-                       LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName,
-                                       excp);
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
-                                       DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
-                                       "Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName
-                                                       + excp.getMessage());
+                       LOGGER.info("Revoking read access to consumer [" + consumerId
+                                       + "] for topic " + topicName);
+
+                       topicService.denyConsumerForTopic(getDmaapContext(), topicName,
+                                       consumerId);
+
+                       LOGGER.info("Read access revoked to consumer [" + consumerId
+                                       + "] for topic " + topicName);
+               } catch ( ConfigDbException | IOException
+                               | TopicExistsException excp) {
+                       LOGGER.error("Error while revoking read access to consumer ["
+                                       + consumerId + "] for topic " + topicName, excp);
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
+                                       DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(), 
+                                       "Error while revoking read access to consumer ["
+                                                       + consumerId + "] for topic " + topicName+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-               } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
-                       LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
-
-                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
-                                       errorMessages.getCreateTopicFail() + excp.getMessage());
+               }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
+                       LOGGER.error("Error while creating a topic: " + excp.getMessage(),
+                                       excp);
+                       
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
+                                       DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
+                                       errorMessages.getCreateTopicFail()+ excp.getMessage());
                        LOGGER.info(errRes.toString());
                        throw new CambriaApiException(errRes);
+                       
+       }
+       }
 
-               }
+       public TopicService getTopicService() {
+               return topicService;
+       }
+
+       public void setTopicService(TopicService topicService) {
+               this.topicService = topicService;
        }
 
+
+       
+
 }