597390a834fca63ebbc12d7f406a4d7359c69859
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / service / TopicRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.dmaap.service;
23
24 import java.io.IOException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import javax.ws.rs.Consumes;
28 import javax.ws.rs.DELETE;
29 import javax.ws.rs.GET;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.PUT;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import javax.ws.rs.core.MediaType;
37
38 import org.apache.http.HttpStatus;
39
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42
43 import org.json.JSONException;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.beans.factory.annotation.Qualifier;
46 import org.springframework.beans.factory.annotation.Value;
47 import org.springframework.stereotype.Component;
48 import com.att.nsa.cambria.CambriaApiException;
49 import com.att.nsa.cambria.beans.DMaaPContext;
50 import com.att.nsa.cambria.beans.TopicBean;
51 import com.att.nsa.cambria.constants.CambriaConstants;
52 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
53 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
54 import com.att.nsa.cambria.exception.DMaaPResponseCode;
55 import com.att.nsa.cambria.exception.ErrorResponse;
56 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
57 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
58 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
59 import com.att.nsa.cambria.service.TopicService;
60 import com.att.nsa.cambria.utils.ConfigurationReader;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
63
64 /**
65  * This class is a CXF REST service which acts as gateway for MR Topic Service.
66  * 
67  * @author author
68  *
69  */
70
71 @Component
72 @Path("/")
73 public class TopicRestService {
74
75         /**
76          * Logger obj
77          */
78         // private static final Logger LOGGER = Logger
79         // .getLogger(TopicRestService.class);
80         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
81         /**
82          * Config Reader
83          */
84         @Autowired
85         @Qualifier("configurationReader")
86         private ConfigurationReader configReader;
87
88         /**
89          * HttpServletRequest obj
90          */
91         @Context
92         private HttpServletRequest request;
93
94         /**
95          * HttpServletResponse obj
96          */
97         @Context
98         private HttpServletResponse response;
99
100         /**
101          * TopicService obj
102          */
103         @Autowired
104         private TopicService tService;
105
106         /**
107          * DMaaPErrorMessages obj
108          */
109         @Autowired
110         private DMaaPErrorMessages errorMessages;
111
112         private DMaaPContext dmaapContext = new DMaaPContext();
113
114         /**
115          * mrNamespace
116          */
117         // @Value("${msgRtr.namespace.aaf}")
118         // private String mrNamespace;
119
120         /**
121          * Fetches a list of topics from the current kafka instance and converted
122          * into json object.
123          * 
124          * @return list of the topics in json format
125          * @throws AccessDeniedException
126          * @throws CambriaApiException
127          * @throws IOException
128          * @throws JSONException
129          */
130         @GET
131         // @Produces(MediaType.TEXT_PLAIN)
132         public void getTopics() throws CambriaApiException {
133                 try {
134
135                         LOGGER.info("Authenticating the user before fetching the topics");
136                         // String permission = "com.att.dmaap.mr.topic|*|view";
137                         String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
138                                         "msgRtr.namespace.aaf");
139                         String permission = mrNameS + "|" + "*" + "|" + "view";
140                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
141                         // Check if client is using AAF CADI Basic Authorization
142                         // If yes then check for AAF role authentication else display all
143                         // topics
144                         if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
145                                 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
146
147                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
148                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
149                                                         errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
150                                         LOGGER.info(errRes.toString());
151                                         throw new DMaaPAccessDeniedException(errRes);
152
153                                 }
154                         }
155
156                         LOGGER.info("Fetching all Topics");
157
158                         tService.getTopics(getDmaapContext());
159
160                         LOGGER.info("Returning List of all Topics");
161
162                 } catch (JSONException | ConfigDbException | IOException excp) {
163                         LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
164
165                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
166                                         DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
167                                         errorMessages.getTopicsfailure() + excp.getMessage());
168                         LOGGER.info(errRes.toString());
169                         throw new CambriaApiException(errRes);
170
171                 }
172         }
173
174         /**
175          * Fetches a list of topics from the current kafka instance and converted
176          * into json object.
177          * 
178          * @return list of the topics in json format
179          * @throws AccessDeniedException
180          * @throws CambriaApiException
181          * @throws IOException
182          * @throws JSONException
183          */
184         @GET
185         @Path("/listAll")
186         // @Produces(MediaType.TEXT_PLAIN)
187         public void getAllTopics() throws CambriaApiException {
188                 try {
189
190                         LOGGER.info("Authenticating the user before fetching the topics");
191                         // String permission = "com.att.dmaap.mr.topic|*|view";
192                         String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
193                                         "msgRtr.namespace.aaf");
194                         String permission = mrNameS + "|" + "*" + "|" + "view";
195                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
196                         // Check if client is using AAF CADI Basic Authorization
197                         // If yes then check for AAF role authentication else display all
198                         // topics
199                         if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
200                                 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
201
202                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
203                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
204                                                         errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
205                                         LOGGER.info(errRes.toString());
206                                         throw new DMaaPAccessDeniedException(errRes);
207
208                                 }
209                         }
210
211                         LOGGER.info("Fetching all Topics");
212
213                         tService.getAllTopics(getDmaapContext());
214
215                         LOGGER.info("Returning List of all Topics");
216
217                 } catch (JSONException | ConfigDbException | IOException excp) {
218                         LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
219
220                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
221                                         DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
222                                         errorMessages.getTopicsfailure() + excp.getMessage());
223                         LOGGER.info(errRes.toString());
224                         throw new CambriaApiException(errRes);
225
226                 }
227         }
228
229         /**
230          * Returns details of the topic whose name is passed as a parameter
231          * 
232          * @param topicName
233          *            - name of the topic
234          * @return details of a topic whose name is mentioned in the request in json
235          *         format.
236          * @throws AccessDeniedException
237          * @throws DMaaPAccessDeniedException
238          * @throws IOException
239          */
240         @GET
241         @Path("/{topicName}")
242         // @Produces(MediaType.TEXT_PLAIN)
243         public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
244                 try {
245
246                         LOGGER.info("Authenticating the user before fetching the details about topic = " + topicName);
247                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
248
249                         // String permission=
250                         // "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
251
252                         // Check if client is using AAF CADI Basic Authorization
253                         // If yes then check for AAF role authentication else display all
254                         // topics
255                         if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
256                                 String permission = aaf.aafPermissionString(topicName, "view");
257                                 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
258
259                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
260                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
261                                                         errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
262                                         LOGGER.info(errRes.toString());
263                                         throw new DMaaPAccessDeniedException(errRes);
264                                 }
265                         }
266
267                         LOGGER.info("Fetching Topic: " + topicName);
268
269                         tService.getTopic(getDmaapContext(), topicName);
270
271                         LOGGER.info("Fetched details of topic: " + topicName);
272
273                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
274                         LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
275
276                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
277                                         DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
278                                         errorMessages.getTopicDetailsFail() + topicName + excp.getMessage());
279                         LOGGER.info(errRes.toString());
280                         throw new CambriaApiException(errRes);
281
282                 }
283         }
284
285         /**
286          * This method is still not working. Need to check on post call and how to
287          * accept parameters for post call
288          * 
289          * @param topicBean
290          *            it will have the bean object
291          * @throws TopicExistsException
292          * @throws CambriaApiException
293          * @throws JSONException
294          * @throws IOException
295          * @throws AccessDeniedException
296          * 
297          */
298         @POST
299         @Path("/create")
300         @Consumes({ MediaType.APPLICATION_JSON })
301         // @Produces(MediaType.TEXT_PLAIN)
302         public void createTopic(TopicBean topicBean) throws CambriaApiException{
303                 try {
304                         LOGGER.info("Creating Topic." + topicBean.getTopicName());
305
306                         tService.createTopic(getDmaapContext(), topicBean);
307
308                         LOGGER.info("Topic created Successfully.");
309                 } catch (TopicExistsException ex) {
310
311                         LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
312
313                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
314                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
315                                         errorMessages.getCreateTopicFail() + ex.getMessage());
316                         LOGGER.info(errRes.toString());
317                         throw new CambriaApiException(errRes);
318
319                 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
320                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
321
322                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
323                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
324                                         errorMessages.getCreateTopicFail() + excp.getMessage());
325                         LOGGER.info(errRes.toString());
326                         throw new CambriaApiException(errRes);
327
328                 } catch (CambriaApiException | IOException excp) {
329                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
330
331                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
332                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
333                                         errorMessages.getCreateTopicFail() + excp.getMessage());
334                         LOGGER.info(errRes.toString());
335                         throw new CambriaApiException(errRes);
336
337                 }
338         }
339
340         /**
341          * Deletes existing topic whose name is passed as a parameter
342          * 
343          * @param topicName
344          *            topic
345          * @throws CambriaApiException
346          * @throws IOException
347          */
348         @DELETE
349         @Path("/{topicName}")
350         // @Produces(MediaType.TEXT_PLAIN)
351         public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
352                 try {
353                         LOGGER.info("Deleting Topic: " + topicName);
354
355                         tService.deleteTopic(getDmaapContext(), topicName);
356
357                         LOGGER.info("Topic [" + topicName + "] deleted successfully.");
358                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
359                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
360
361                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
362                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
363                                         errorMessages.getCreateTopicFail() + excp.getMessage());
364                         LOGGER.info(errRes.toString());
365                         throw new CambriaApiException(errRes);
366
367                 } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
368                         LOGGER.error("Error while deleting topic: " + topicName, excp);
369
370                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
371                                         DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
372                                         errorMessages.getDeleteTopicFail() + topicName + excp.getMessage());
373                         LOGGER.info(errRes.toString());
374                         throw new CambriaApiException(errRes);
375
376                 }
377         }
378
379         private DMaaPContext getDmaapContext() {
380
381                 dmaapContext.setRequest(request);
382                 dmaapContext.setResponse(response);
383                 dmaapContext.setConfigReader(configReader);
384
385                 return dmaapContext;
386
387         }
388
389         /**
390          * This method will fetch the details of publisher by giving topic name
391          * 
392          * @param topicName
393          * @throws CambriaApiException
394          * @throws AccessDeniedException
395          */
396         @GET
397         @Path("/{topicName}/producers")
398         // @Produces(MediaType.TEXT_PLAIN)
399         public void getPublishersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
400                 try {
401
402                         // String permission =
403                         // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
404                         // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
405                         // String permission = aaf.aafPermissionString(topicName, "view");
406                         // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
407                         // permission))
408                         // {
409                         LOGGER.info("Fetching list of all the publishers for topic " + topicName);
410
411                         tService.getPublishersByTopicName(getDmaapContext(), topicName);
412
413                         LOGGER.info("Returning list of all the publishers for topic " + topicName);
414                         // }else{
415                         // LOGGER.error("Error while fetching list of publishers for topic
416                         // "+ topicName);
417                         //
418                         // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
419                         // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
420                         // errorMessages.getNotPermitted1()+" fetch list of publishers
421                         // "+errorMessages.getNotPermitted2());
422                         // LOGGER.info(errRes);
423                         // throw new DMaaPAccessDeniedException(errRes);
424                         //
425                         // }
426
427                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
428                         LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
429                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
430                                         DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
431                                         "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
432                         LOGGER.info(errRes.toString());
433                         throw new CambriaApiException(errRes);
434
435                 }
436         }
437
438         /**
439          * proving permission for the topic for a particular publisher id
440          * 
441          * @param topicName
442          * @param producerId
443          * @throws CambriaApiException
444          */
445         @PUT
446         @Path("/{topicName}/producers/{producerId}")
447         public void permitPublisherForTopic(@PathParam("topicName") String topicName,
448                         @PathParam("producerId") String producerId) throws CambriaApiException {
449                 try {
450                         LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
451
452                         tService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
453
454                         LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
455                 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
456                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
457
458                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
459                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
460                                         errorMessages.getCreateTopicFail() + excp.getMessage());
461                         LOGGER.info(errRes.toString());
462                         throw new CambriaApiException(errRes);
463
464                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
465                         LOGGER.error("Error while granting write access to producer [" + producerId + "] for topic " + topicName,
466                                         excp);
467
468                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
469                                         DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
470                                         "Error while granting write access to producer [" + producerId + "] for topic " + topicName
471                                                         + excp.getMessage());
472                         LOGGER.info(errRes.toString());
473                         throw new CambriaApiException(errRes);
474
475                 }
476         }
477
478         /**
479          * Removing access for a publisher id for any particular topic
480          * 
481          * @param topicName
482          * @param producerId
483          * @throws CambriaApiException
484          */
485         @DELETE
486         @Path("/{topicName}/producers/{producerId}")
487         public void denyPublisherForTopic(@PathParam("topicName") String topicName,
488                         @PathParam("producerId") String producerId) throws CambriaApiException {
489                 try {
490                         LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
491
492                         tService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
493
494                         LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
495                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
496                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
497
498                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
499                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
500                                         errorMessages.getCreateTopicFail() + excp.getMessage());
501                         LOGGER.info(errRes.toString());
502                         throw new CambriaApiException(errRes);
503
504                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
505                         LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName,
506                                         excp);
507                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
508                                         DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
509                                         "Error while revoking write access to producer [" + producerId + "] for topic " + topicName
510                                                         + excp.getMessage());
511                         LOGGER.info(errRes.toString());
512                         throw new CambriaApiException(errRes);
513                 }
514         }
515
516         /**
517          * Get the consumer details by the topic name
518          * 
519          * @param topicName
520          * @throws AccessDeniedException
521          * @throws CambriaApiException
522          */
523         @GET
524         @Path("/{topicName}/consumers")
525         // @Produces(MediaType.TEXT_PLAIN)
526         public void getConsumersByTopicName(@PathParam("topicName") String topicName)
527                         throws AccessDeniedException, CambriaApiException {
528                 try {
529
530                         // String permission =
531                         // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
532                         // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
533                         // String permission = aaf.aafPermissionString(topicName, "view");
534                         // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
535                         // permission))
536                         // {
537                         LOGGER.info("Fetching list of all consumers for topic " + topicName);
538
539                         tService.getConsumersByTopicName(getDmaapContext(), topicName);
540
541                         LOGGER.info("Returning list of all consumers for topic " + topicName);
542
543                         // }else{
544                         // LOGGER.error(
545                         // "Error while fetching list of all consumers for topic "
546                         // + topicName);
547                         // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
548                         // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
549                         // errorMessages.getNotPermitted1()+" fetch list of consumers
550                         // "+errorMessages.getNotPermitted2());
551                         // LOGGER.info(errRes);
552                         // throw new DMaaPAccessDeniedException(errRes);
553                         //
554                         //
555                         // }
556
557                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
558                         LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
559                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
560                                         DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
561                                         "Error while fetching list of all consumers for topic: " + topicName + excp.getMessage());
562                         LOGGER.info(errRes.toString());
563                         throw new CambriaApiException(errRes);
564
565                 }
566         }
567
568         /**
569          * providing access for consumer for any particular topic
570          * 
571          * @param topicName
572          * @param consumerId
573          * @throws CambriaApiException
574          */
575         @PUT
576         @Path("/{topicName}/consumers/{consumerId}")
577         public void permitConsumerForTopic(@PathParam("topicName") String topicName,
578                         @PathParam("consumerId") String consumerId) throws CambriaApiException {
579                 try {
580                         LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
581
582                         tService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
583
584                         LOGGER.info("Read access granted to consumer [" + consumerId + "] for topic " + topicName);
585                 } catch (AccessDeniedException | ConfigDbException | IOException | TopicExistsException excp) {
586                         LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName,
587                                         excp);
588                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
589                                         DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
590                                         "Error while granting read access to consumer [" + consumerId + "] for topic " + topicName
591                                                         + excp.getMessage());
592                         LOGGER.info(errRes.toString());
593                         throw new CambriaApiException(errRes);
594
595                 }
596         }
597
598         /**
599          * Removing access for consumer for any particular topic
600          * 
601          * @param topicName
602          * @param consumerId
603          * @throws CambriaApiException
604          */
605         @DELETE
606         @Path("/{topicName}/consumers/{consumerId}")
607         public void denyConsumerForTopic(@PathParam("topicName") String topicName,
608                         @PathParam("consumerId") String consumerId) throws CambriaApiException {
609                 try {
610                         LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
611
612                         tService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
613
614                         LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
615                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
616                         LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName,
617                                         excp);
618                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
619                                         DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
620                                         "Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName
621                                                         + excp.getMessage());
622                         LOGGER.info(errRes.toString());
623                         throw new CambriaApiException(errRes);
624                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
625                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
626
627                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
628                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
629                                         errorMessages.getCreateTopicFail() + excp.getMessage());
630                         LOGGER.info(errRes.toString());
631                         throw new CambriaApiException(errRes);
632
633                 }
634         }
635
636 }