42049d2a5e6c48d323bb8efd254b79934b6ed253
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / service / TopicRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.dmaap.service;
23
24 import java.io.IOException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import javax.ws.rs.Consumes;
28 import javax.ws.rs.DELETE;
29 import javax.ws.rs.GET;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.PUT;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import javax.ws.rs.core.MediaType;
37
38 import org.apache.http.HttpStatus;
39
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42
43 import org.json.JSONException;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.beans.factory.annotation.Qualifier;
46 import org.springframework.beans.factory.annotation.Value;
47 import org.springframework.stereotype.Component;
48 import org.onap.dmaap.dmf.mr.CambriaApiException;
49 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
50 import org.onap.dmaap.dmf.mr.beans.TopicBean;
51 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
52 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
53 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
54 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
55 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
56 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
57 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
58 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
59 import org.onap.dmaap.dmf.mr.service.TopicService;
60 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
63
64 /**
65  * This class is a CXF REST service which acts 
66  * as gateway for MR Topic Service.
67  * @author Ramkumar Sembaiyan
68  *
69  */
70
71 @Component
72 @Path("/")
73 public class TopicRestService {
74
75         /**
76          * Logger obj
77          */
78         //private static final Logger LOGGER = Logger           .getLogger(TopicRestService.class);
79         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
80         /**
81          * Config Reader
82          */
83         @Autowired
84         @Qualifier("configurationReader")
85         private ConfigurationReader configReader;
86
87         /**
88          * HttpServletRequest obj
89          */
90         @Context
91         private HttpServletRequest request;
92
93         /**
94          * HttpServletResponse obj
95          */
96         @Context
97         private HttpServletResponse response;
98
99         /**
100          * TopicService obj
101          */
102         @Autowired
103         private TopicService topicService;
104         
105         /**
106          * DMaaPErrorMessages obj
107          */
108         @Autowired
109         private DMaaPErrorMessages errorMessages;
110         
111         /**
112          * mrNamespace
113          */
114         //@Value("${msgRtr.namespace.aaf}")
115 //      private String mrNamespace;
116         
117
118         /**
119          * Fetches a list of topics from the current kafka instance and converted
120          * into json object.
121          * 
122          * @return list of the topics in json format
123          * @throws AccessDeniedException 
124          * @throws CambriaApiException 
125          * @throws IOException
126          * @throws JSONException
127          * */
128         @GET
129         //@Produces(MediaType.TEXT_PLAIN)
130         public void getTopics() throws CambriaApiException {
131                 try {
132                                                 
133                         LOGGER.info("Authenticating the user before fetching the topics");
134                         //String permission = "com.att.dmaap.mr.topic|*|view";
135                         String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
136                         String permission =mrNameS+"|"+"*"+"|"+"view";
137                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
138                         //Check if client is using AAF CADI Basic Authorization
139                         //If yes then check for AAF role authentication else display all topics 
140                         if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
141                         {
142                                 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
143                                 {
144                                 
145                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
146                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
147                                                         errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
148                                         LOGGER.info(errRes.toString());
149                                         throw new DMaaPAccessDeniedException(errRes);
150                                         
151                                 
152                                 }
153                         }       
154                         
155                                 LOGGER.info("Fetching all Topics");
156                                         //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl();
157                                 topicService.getTopics(getDmaapContext());
158         
159                                 LOGGER.info("Returning List of all Topics");
160                         
161                         
162                 } catch (JSONException | ConfigDbException | IOException excp) {
163                         LOGGER.error(
164                                         "Failed to retrieve list of all topics: "
165                                                         + excp.getMessage(), excp);
166                         
167                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
168                                         DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), 
169                                         errorMessages.getTopicsfailure()+ excp.getMessage());
170                         LOGGER.info(errRes.toString());
171                         throw new CambriaApiException(errRes);
172                         
173
174                 }
175         }
176
177         /**
178          * Fetches a list of topics from the current kafka instance and converted
179          * into json object.
180          * 
181          * @return list of the topics in json format
182          * @throws AccessDeniedException 
183          * @throws CambriaApiException 
184          * @throws IOException
185          * @throws JSONException
186          * */
187         @GET
188         @Path("/listAll")
189         //@Produces(MediaType.TEXT_PLAIN)
190         public void getAllTopics() throws CambriaApiException {
191                 try {
192                                                 
193                         LOGGER.info("Authenticating the user before fetching the topics");
194                         //String permission = "com.att.dmaap.mr.topic|*|view";
195                         String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
196                         String permission =mrNameS+"|"+"*"+"|"+"view";
197                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
198                         //Check if client is using AAF CADI Basic Authorization
199                         //If yes then check for AAF role authentication else display all topics 
200                         if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
201                         {
202                                 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
203                                 {
204                                 
205                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
206                                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
207                                                         errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
208                                         LOGGER.info(errRes.toString());
209                                         throw new DMaaPAccessDeniedException(errRes);
210                                         
211                                 
212                                 }
213                         }       
214                         
215                                 LOGGER.info("Fetching all Topics");
216                                 
217                                 topicService.getAllTopics(getDmaapContext());
218         
219                                 LOGGER.info("Returning List of all Topics");
220                         
221                         
222                 } catch (JSONException | ConfigDbException | IOException excp) {
223                         LOGGER.error(
224                                         "Failed to retrieve list of all topics: "
225                                                         + excp.getMessage(), excp);
226                         
227                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
228                                         DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(), 
229                                         errorMessages.getTopicsfailure()+ excp.getMessage());
230                         LOGGER.info(errRes.toString());
231                         throw new CambriaApiException(errRes);
232                         
233
234                 }
235         }
236
237         
238         /**
239          * Returns details of the topic whose name is passed as a parameter
240          * 
241          * @param topicName
242          *            - name of the topic
243          * @return details of a topic whose name is mentioned in the request in json
244          *         format.
245          * @throws AccessDeniedException 
246          * @throws DMaaPAccessDeniedException 
247          * @throws IOException
248          * */
249         @GET
250         @Path("/{topicName}")
251         //@Produces(MediaType.TEXT_PLAIN)
252         public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
253                 try {
254                         
255                         LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
256                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
257                         
258                         //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
259                         
260                         //Check if client is using AAF CADI Basic Authorization
261                         //If yes then check for AAF role authentication else display all topics 
262                         if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
263                         {
264                                 String permission = aaf.aafPermissionString(topicName, "view");
265                                 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
266                                 {
267                                                 
268                                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
269                                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
270                                                                 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
271                                                 LOGGER.info(errRes.toString());
272                                                 throw new DMaaPAccessDeniedException(errRes);
273                                         }
274                         } 
275                         
276                                 LOGGER.info("Fetching Topic: " + topicName);
277                                 
278                                 topicService.getTopic(getDmaapContext(), topicName);
279
280                                 LOGGER.info("Fetched details of topic: " + topicName);
281                 
282                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
283                         LOGGER.error("Failed to retrieve details of topic: " + topicName,
284                                         excp);
285                         
286                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
287                                         DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(), 
288                                         errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
289                         LOGGER.info(errRes.toString());
290                         throw new CambriaApiException(errRes);
291                         
292                         
293                 }
294         }
295
296         
297         
298         /**
299          * This method is still not working. Need to check on post call and how to
300          * accept parameters for post call
301          * 
302          * @param topicBean
303          *            it will have the bean object
304          * @throws TopicExistsException
305          * @throws CambriaApiException
306          * @throws JSONException 
307          * @throws IOException
308          * @throws AccessDeniedException
309          * 
310          * */
311         @POST
312         @Path("/create")
313         @Consumes({ MediaType.APPLICATION_JSON })
314         //@Produces(MediaType.TEXT_PLAIN)
315         public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
316         try {
317                         LOGGER.info("Creating Topic."+topicBean.getTopicName());
318                 
319                         topicService.createTopic(getDmaapContext(), topicBean);
320
321                         LOGGER.info("Topic created Successfully.");
322                 } 
323         catch (TopicExistsException ex){
324                 
325                 LOGGER.error("Error while creating a topic: " + ex.getMessage(),
326                                 ex);
327                 
328                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT, 
329                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
330                                 errorMessages.getCreateTopicFail()+ ex.getMessage());
331                 LOGGER.info(errRes.toString());
332                 throw new CambriaApiException(errRes);
333                 
334                 
335         
336         
337                 }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
338                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
339                                         excp);
340                         
341                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
342                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
343                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
344                         LOGGER.info(errRes.toString());
345                         throw new CambriaApiException(errRes);
346                         
347                 }catch (CambriaApiException |  IOException                       excp) {
348                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
349                                         excp);
350                         
351                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
352                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
353                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
354                         LOGGER.info(errRes.toString());
355                         throw new CambriaApiException(errRes);
356                         
357                 }
358         }
359
360         /**
361          * Deletes existing topic whose name is passed as a parameter
362          * 
363          * @param topicName
364          *            topic
365          * @throws CambriaApiException 
366          * @throws IOException
367          * */
368         @DELETE
369         @Path("/{topicName}")
370         //@Produces(MediaType.TEXT_PLAIN)
371         public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
372                 try {
373                         LOGGER.info("Deleting Topic: " + topicName);
374
375                         topicService.deleteTopic(getDmaapContext(), topicName);
376
377                         LOGGER.info("Topic [" + topicName + "] deleted successfully.");
378                 } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
379                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
380                                         excp);
381                         
382                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
383                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
384                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
385                         LOGGER.info(errRes.toString());
386                         throw new CambriaApiException(errRes);
387                         
388         }catch (IOException | ConfigDbException
389                                 | CambriaApiException | TopicExistsException excp) {
390                         LOGGER.error("Error while deleting topic: " + topicName, excp);
391                 
392                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
393                                         DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(), 
394                                         errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
395                         LOGGER.info(errRes.toString());
396                         throw new CambriaApiException(errRes);
397                         
398                 }
399         }
400
401         private DMaaPContext getDmaapContext() {
402
403                 DMaaPContext dmaapContext = new DMaaPContext();
404                 dmaapContext.setRequest(request);
405                 dmaapContext.setResponse(response);
406                 dmaapContext.setConfigReader(configReader);
407
408                 return dmaapContext;
409
410         }
411
412         /**
413          * This method will fetch the details of publisher by giving topic name
414          * 
415          * @param topicName
416          * @throws CambriaApiException 
417          * @throws AccessDeniedException 
418          */
419         @GET
420         @Path("/{topicName}/producers")
421         //@Produces(MediaType.TEXT_PLAIN)
422         public void getPublishersByTopicName(
423                         @PathParam("topicName") String topicName) throws CambriaApiException {
424                 try {
425                         
426 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
427 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
428 //                      String permission = aaf.aafPermissionString(topicName, "view");
429 //                      if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
430 //                      {
431                                 LOGGER.info("Fetching list of all the publishers for topic "
432                                                 + topicName);
433
434                                 topicService.getPublishersByTopicName(getDmaapContext(), topicName);
435
436                                 LOGGER.info("Returning list of all the publishers for topic "
437                                                 + topicName);
438 //                      }else{
439 //                              LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
440 //                              
441 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
442 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
443 //                                              errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
444 //                              LOGGER.info(errRes);
445 //                              throw new DMaaPAccessDeniedException(errRes);
446 //                              
447 //                      }
448                         
449                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
450                         LOGGER.error("Error while fetching list of publishers for topic "
451                                         + topicName, excp);
452                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
453                                         DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(), 
454                                         "Error while fetching list of publishers for topic: "
455                                                         + topicName + excp.getMessage());
456                         LOGGER.info(errRes.toString());
457                         throw new CambriaApiException(errRes);
458                         
459                 }
460         }
461
462         /**
463          * proving permission for the topic for a particular publisher id
464          * 
465          * @param topicName
466          * @param producerId
467          * @throws CambriaApiException 
468          */
469         @PUT
470         @Path("/{topicName}/producers/{producerId}")
471         public void permitPublisherForTopic(
472                         @PathParam("topicName") String topicName,
473                         @PathParam("producerId") String producerId) throws CambriaApiException {
474                 try {
475                         LOGGER.info("Granting write access to producer [" + producerId
476                                         + "] for topic " + topicName);
477
478                         topicService.permitPublisherForTopic(getDmaapContext(), topicName,
479                                         producerId);
480
481                         LOGGER.info("Write access has been granted to producer ["
482                                         + producerId + "] for topic " + topicName);
483                 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
484                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
485                                         excp);
486                         
487                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
488                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
489                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
490                         LOGGER.info(errRes.toString());
491                         throw new CambriaApiException(errRes);
492                         
493           }catch ( ConfigDbException | IOException
494                                 | TopicExistsException excp) {
495                         LOGGER.error("Error while granting write access to producer ["
496                                         + producerId + "] for topic " + topicName, excp);
497                         
498                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND, 
499                                         DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(), 
500                                         "Error while granting write access to producer ["
501                                                         + producerId + "] for topic " + topicName + excp.getMessage());
502                         LOGGER.info(errRes.toString());
503                         throw new CambriaApiException(errRes);
504                         
505                 }
506         }
507
508         /**
509          * Removing access for a publisher id for any particular topic
510          * 
511          * @param topicName
512          * @param producerId
513          * @throws CambriaApiException 
514          */
515         @DELETE
516         @Path("/{topicName}/producers/{producerId}")
517         public void denyPublisherForTopic(@PathParam("topicName") String topicName,
518                         @PathParam("producerId") String producerId) throws CambriaApiException {
519                 try {
520                         LOGGER.info("Revoking write access to producer [" + producerId
521                                         + "] for topic " + topicName);
522
523                         topicService.denyPublisherForTopic(getDmaapContext(), topicName,
524                                         producerId);
525
526                         LOGGER.info("Write access revoked for producer [" + producerId
527                                         + "] for topic " + topicName);
528                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
529                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
530                                         excp);
531                         
532                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
533                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
534                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
535                         LOGGER.info(errRes.toString());
536                         throw new CambriaApiException(errRes);
537                         
538         }catch ( ConfigDbException | IOException
539                                 | TopicExistsException excp) {
540                         LOGGER.error("Error while revoking write access for producer ["
541                                         + producerId + "] for topic " + topicName, excp);
542                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
543                                         DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(), 
544                                         "Error while revoking write access to producer ["
545                                                         + producerId + "] for topic " + topicName + excp.getMessage());
546                         LOGGER.info(errRes.toString());
547                         throw new CambriaApiException(errRes);
548                 }
549         }
550
551         /**
552          * Get the consumer details by the topic name
553          * 
554          * @param topicName
555          * @throws AccessDeniedException 
556          * @throws CambriaApiException 
557          */
558         @GET
559         @Path("/{topicName}/consumers")
560         //@Produces(MediaType.TEXT_PLAIN)
561         public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException, 
562         CambriaApiException {
563                 try {
564                         
565                         
566 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
567 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
568 //                      String permission = aaf.aafPermissionString(topicName, "view");
569 //                      if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
570 //                      {
571                                 LOGGER.info("Fetching list of all consumers for topic " + topicName);
572
573                                 topicService.getConsumersByTopicName(getDmaapContext(), topicName);
574
575                                 LOGGER.info("Returning list of all consumers for topic "
576                                                 + topicName);
577                                 
578 //                      }else{
579 //                              LOGGER.error(
580 //                                              "Error while fetching list of all consumers for topic "
581 //                                                              + topicName);
582 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
583 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
584 //                                              errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
585 //                              LOGGER.info(errRes);
586 //                              throw new DMaaPAccessDeniedException(errRes);
587 //                              
588 //                              
589 //                      }
590                         
591                         
592                         
593                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
594                         LOGGER.error(
595                                         "Error while fetching list of all consumers for topic "
596                                                         + topicName, excp);
597                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
598                                         DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(), 
599                                         "Error while fetching list of all consumers for topic: "
600                                                         + topicName+ excp.getMessage());
601                         LOGGER.info(errRes.toString());
602                         throw new CambriaApiException(errRes);
603                         
604                 }
605         }
606
607         /**
608          * providing access for consumer for any particular topic
609          * 
610          * @param topicName
611          * @param consumerId
612          * @throws CambriaApiException 
613          */
614         @PUT
615         @Path("/{topicName}/consumers/{consumerId}")
616         public void permitConsumerForTopic(
617                         @PathParam("topicName") String topicName,
618                         @PathParam("consumerId") String consumerId) throws CambriaApiException {
619                 try {
620                         LOGGER.info("Granting read access to consumer [" + consumerId
621                                         + "] for topic " + topicName);
622
623                         topicService.permitConsumerForTopic(getDmaapContext(), topicName,
624                                         consumerId);
625
626                         LOGGER.info("Read access granted to consumer [" + consumerId
627                                         + "] for topic " + topicName);
628                 } catch (AccessDeniedException | ConfigDbException | IOException
629                                 | TopicExistsException excp) {
630                         LOGGER.error("Error while granting read access to consumer ["
631                                         + consumerId + "] for topic " + topicName, excp);
632                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
633                                         DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(), 
634                                         "Error while granting read access to consumer ["
635                                                         + consumerId + "] for topic " + topicName+ excp.getMessage());
636                         LOGGER.info(errRes.toString());
637                         throw new CambriaApiException(errRes);
638                         
639                 }
640         }
641
642         /**
643          * Removing access for consumer for any particular topic
644          * 
645          * @param topicName
646          * @param consumerId
647          * @throws CambriaApiException 
648          */
649         @DELETE
650         @Path("/{topicName}/consumers/{consumerId}")
651         public void denyConsumerForTopic(@PathParam("topicName") String topicName,
652                         @PathParam("consumerId") String consumerId) throws CambriaApiException {
653                 try {
654                         LOGGER.info("Revoking read access to consumer [" + consumerId
655                                         + "] for topic " + topicName);
656
657                         topicService.denyConsumerForTopic(getDmaapContext(), topicName,
658                                         consumerId);
659
660                         LOGGER.info("Read access revoked to consumer [" + consumerId
661                                         + "] for topic " + topicName);
662                 } catch ( ConfigDbException | IOException
663                                 | TopicExistsException excp) {
664                         LOGGER.error("Error while revoking read access to consumer ["
665                                         + consumerId + "] for topic " + topicName, excp);
666                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
667                                         DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(), 
668                                         "Error while revoking read access to consumer ["
669                                                         + consumerId + "] for topic " + topicName+ excp.getMessage());
670                         LOGGER.info(errRes.toString());
671                         throw new CambriaApiException(errRes);
672                 }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
673                         LOGGER.error("Error while creating a topic: " + excp.getMessage(),
674                                         excp);
675                         
676                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
677                                         DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(), 
678                                         errorMessages.getCreateTopicFail()+ excp.getMessage());
679                         LOGGER.info(errRes.toString());
680                         throw new CambriaApiException(errRes);
681                         
682         }
683         }
684
685         public TopicService getTopicService() {
686                 return topicService;
687         }
688
689         public void setTopicService(TopicService topicService) {
690                 this.topicService = topicService;
691         }
692
693
694         
695
696 }