Merge "sonar critical for errorhandling"
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / service / impl / TopicServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 /**
23  * 
24  */
25 package com.att.nsa.cambria.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
37
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
60
61 /**
62  * @author author
63  *
64  */
65 @Service
66 public class TopicServiceImpl implements TopicService {
67
68         //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
70         @Autowired
71         private DMaaPErrorMessages errorMessages;
72         
73         
74         
75         //@Value("${msgRtr.topicfactory.aaf}")
76         //private String mrFactory;
77         
78         
79         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
80                 this.errorMessages = errorMessages;
81         }
82
83         /**
84          * @param dmaapContext
85          * @throws JSONException
86          * @throws ConfigDbException
87          * @throws IOException
88          * 
89          */
90         @Override
91         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
92
93                 LOGGER.info("Fetching list of all the topics.");
94                 JSONObject json = new JSONObject();
95
96                 JSONArray topicsList = new JSONArray();
97
98                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
99                         topicsList.put(topic.getName());
100                 }
101
102                 json.put("topics", topicsList);
103
104                 LOGGER.info("Returning list of all the topics.");
105                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
106
107         }
108
109         /**
110          * @param dmaapContext
111          * @throws JSONException
112          * @throws ConfigDbException
113          * @throws IOException
114          * 
115          */
116         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
117
118                 LOGGER.info("Fetching list of all the topics.");
119                 JSONObject json = new JSONObject();
120
121                 JSONArray topicsList = new JSONArray();
122
123                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
124                         JSONObject obj = new JSONObject();
125                         obj.put("topicName", topic.getName());
126                         //obj.put("description", topic.getDescription());
127                         obj.put("owner", topic.getOwner());
128                         obj.put("txenabled", topic.isTransactionEnabled());
129                         topicsList.put(obj);
130                 }
131
132                 json.put("topics", topicsList);
133
134                 LOGGER.info("Returning list of all the topics.");
135                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
136
137         }
138
139         
140         /**
141          * @param dmaapContext
142          * @param topicName
143          * @throws ConfigDbException
144          * @throws IOException
145          * @throws TopicExistsException
146          */
147         @Override
148         public void getTopic(DMaaPContext dmaapContext, String topicName)
149                         throws ConfigDbException, IOException, TopicExistsException {
150
151                 LOGGER.info("Fetching details of topic " + topicName);
152                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
153
154                 if (null == t) {
155                         LOGGER.error("Topic [" + topicName + "] does not exist.");
156                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
157                 }
158
159                 JSONObject o = new JSONObject();
160                 o.put ( "name", t.getName () );
161                 o.put ( "description", t.getDescription () );
162                 
163                 if (null!=t.getOwners ())
164                 o.put ( "owner", t.getOwners ().iterator ().next () );
165                 if(null!=t.getReaderAcl ())
166                 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
167                 if(null!=t.getWriterAcl ())
168                 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
169         
170                 LOGGER.info("Returning details of topic " + topicName);
171                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
172
173         }
174
175         
176         /**
177          * @param dmaapContext
178          * @param topicBean
179          * @throws CambriaApiException
180          * @throws AccessDeniedException
181          * @throws IOException
182          * @throws TopicExistsException
183          * @throws JSONException
184          * 
185          * 
186          * 
187          */
188         @Override
189         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190                         throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
191
192                 LOGGER.info("Creating topic " + topicBean.getTopicName());
193                 
194                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
195                 String key = null;
196                 String appName=dmaapContext.getRequest().getHeader("AppName");
197                 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
198                 if(user != null)
199                 {
200                         key = user.getKey();
201                         
202                         if(  enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
203                                 
204                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
205                                 
206                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
207                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
208                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
209                                 LOGGER.info(errRes.toString());
210                                 throw new DMaaPAccessDeniedException(errRes);
211                                 
212                         }
213                 }
214                                 
215                 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
216                         /*else if (user == null &&  null==dmaapContext.getRequest().getHeader("Authorization")     && 
217                                          (null == appName  &&  null == dmaapContext.getRequest().getHeader("cookie"))) {
218                         LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
219                         
220                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
221                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
222                                         errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
223                         LOGGER.info(errRes.toString());
224                         throw new DMaaPAccessDeniedException(errRes);
225                 }*/
226                 
227                 if (user == null &&  (null!=dmaapContext.getRequest().getHeader("Authorization"))) {
228                         //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
229                          // ACL authentication is not provided so we will use the aaf authentication
230                         LOGGER.info("Authorization the topic");
231                 
232                         String permission = "";
233                         String nameSpace="";
234                         if(topicBean.getTopicName().indexOf(".")>1)
235                          nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
236                 
237                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
238                 
239                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
240                         
241                         permission = mrFactoryVal+nameSpace+"|create";
242                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
243                         
244                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
245                         {
246                                 
247                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
248                                 
249                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
250                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
251                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
252                                 LOGGER.info(errRes.toString());
253                                 throw new DMaaPAccessDeniedException(errRes);
254                                 
255                         }else{
256                                 // if user is null and aaf authentication is ok then key should be ""
257                                 //key = "";
258                                 /**
259                                  * Added as part of AAF user it should return username
260                                  */
261                                 
262                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
263                                 LOGGER.info("key ==================== "+key);
264                                 
265                         }
266                 }
267
268                 try {
269                         final String topicName = topicBean.getTopicName();
270                         final String desc = topicBean.getTopicDescription();
271
272                         final  int partitions = topicBean.getPartitionCount();
273                 
274                         final int replicas = topicBean.getReplicationCount();
275                         boolean transactionEnabled = topicBean.isTransactionEnabled();
276                         
277
278                         final Broker metabroker = getMetaBroker(dmaapContext);
279                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
280                                         transactionEnabled);
281
282                         LOGGER.info("Topic created successfully. Sending response");
283                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
284                 } catch (JSONException excp) {
285                         
286                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
287                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, 
288                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), 
289                                         errorMessages.getIncorrectJson());
290                         LOGGER.info(errRes.toString());
291                         throw new CambriaApiException(errRes);
292                         
293                 }
294         }
295
296         /**
297          * @param dmaapContext
298          * @param topicName
299          * @throws ConfigDbException
300          * @throws IOException
301          * @throws TopicExistsException
302          * @throws CambriaApiException
303          * @throws AccessDeniedException
304          */
305         @Override
306         public void deleteTopic(DMaaPContext dmaapContext, String topicName)
307                         throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
308
309                 LOGGER.info("Deleting topic " + topicName);
310                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
311
312                 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
313                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
314 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
315                         String permission = "";
316                         String nameSpace="";
317                         nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
318                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
319 //                      String tokens[] = topicName.split(".mr.topic.");
320                         permission = mrFactoryVal+nameSpace+"|destroy";
321                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
322                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
323                         {
324                                 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
325                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
326                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
327                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
328                                 LOGGER.info(errRes.toString());
329                                 throw new DMaaPAccessDeniedException(errRes);
330                         }
331                         
332                         
333                 }
334
335                 final Broker metabroker = getMetaBroker(dmaapContext);
336                 final Topic topic = metabroker.getTopic(topicName);
337
338                 if (topic == null) {
339                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
340                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
341                 }
342
343                 metabroker.deleteTopic(topicName);
344
345                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
346                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
347
348         }
349
350         /**
351          * 
352          * @param dmaapContext
353          * @return
354          */
355         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
356                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
357         }
358
359         /**
360          * @param dmaapContext
361          * @param topicName
362          * @throws ConfigDbException
363          * @throws IOException
364          * @throws TopicExistsException
365          * 
366          */
367         @Override
368         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
369                         throws ConfigDbException, IOException, TopicExistsException {
370                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
371                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
372
373                 if (topic == null) {
374                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
375                         throw new TopicExistsException(
376                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
377                 }
378                 
379                 
380
381                 final NsaAcl acl = topic.getWriterAcl();
382
383                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
384                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
385
386         }
387
388         /**
389          * 
390          * @param acl
391          * @return
392          */
393         private static JSONObject aclToJson(NsaAcl acl) {
394                 final JSONObject o = new JSONObject();
395                 if (acl == null) {
396                         o.put("enabled", false);
397                         o.put("users", new JSONArray());
398                 } else {
399                         o.put("enabled", acl.isActive());
400
401                         final JSONArray a = new JSONArray();
402                         for (String user : acl.getUsers()) {
403                                 a.put(user);
404                         }
405                         o.put("users", a);
406                 }
407                 return o;
408         }
409
410         /**
411          * @param dmaapContext
412          * @param topicName
413          */
414         @Override
415         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
416                         throws IOException, ConfigDbException, TopicExistsException {
417                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
418                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
419
420                 if (topic == null) {
421                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
422                         throw new TopicExistsException(
423                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
424                 }
425
426                 final NsaAcl acl = topic.getReaderAcl();
427
428                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
429                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
430
431         }
432
433         /**
434          * 
435          * @param t
436          * @return
437          */
438         private static JSONObject topicToJson(Topic t) {
439                 final JSONObject o = new JSONObject();
440
441                 o.put("name", t.getName());
442                 o.put("description", t.getDescription());
443                 o.put("owner", t.getOwner());
444                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
445                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
446
447                 return o;
448         }
449
450         /**
451          * @param dmaapContext
452          * @param topicName
453          * @param producerId
454          * @throws ConfigDbException
455          * @throws IOException
456          * @throws TopicExistsException
457          * @throws AccessDeniedException
458          * @throws  
459          * 
460          */
461         @Override
462         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
463                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
464
465                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
466                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
467                 
468 //              if (user == null) {
469 //                      
470 //                      LOGGER.info("Authenticating the user, as ACL authentication is not provided");
471 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
472 //                      
473 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
474 //                      String permission = aaf.aafPermissionString(topicName, "manage");
475 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
476 //                      {
477 //                              LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
478 //                                                                      + ". Authentication failed.");
479 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
480 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
481 //                                              errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
482 //                              LOGGER.info(errRes);
483 //                              throw new DMaaPAccessDeniedException(errRes);
484 //                      }
485 //              }
486
487                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
488
489                 if (null == topic) {
490                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
491                                         + "] does not exist.");
492                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
493                                         + "] for topic. Topic [" + topicName + "] does not exist.");
494                 }
495
496                 topic.permitWritesFromUser(producerId, user);
497
498                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
499                                 + "]. Sending response.");
500                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
501
502         }
503
504         /**
505          * @param dmaapContext
506          * @param topicName
507          * @param producerId
508          * @throws ConfigDbException
509          * @throws IOException
510          * @throws TopicExistsException
511          * @throws AccessDeniedException
512          * @throws DMaaPAccessDeniedException 
513          * 
514          */
515         @Override
516         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
517                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
518
519                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
520                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
521 //              if (user == null) {
522 //                      
523 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
524 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
525 //                      String permission = aaf.aafPermissionString(topicName, "manage");
526 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
527 //                      {
528 //                              LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
529 //                                              + ". Authentication failed.");
530 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
531 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
532 //                                              errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
533 //                              LOGGER.info(errRes);
534 //                              throw new DMaaPAccessDeniedException(errRes);
535 //                              
536 //                      }
537 //              }
538
539                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
540
541                 if (null == topic) {
542                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
543                                         + "] does not exist.");
544                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
545                                         + "] for topic. Topic [" + topicName + "] does not exist.");
546                 }
547
548                 topic.denyWritesFromUser(producerId, user);
549
550                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
551                                 + "]. Sending response.");
552                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
553
554         }
555
556         /**
557          * @param dmaapContext
558          * @param topicName
559          * @param consumerId
560          * @throws DMaaPAccessDeniedException 
561          */
562         @Override
563         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
564                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
565
566                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
567                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
568 //              if (user == null) {
569 //                      
570 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
571 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
572 //                      String permission = aaf.aafPermissionString(topicName, "manage");
573 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
574 //                      {
575 //                              LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
576 //                                              + ". Authentication failed.");
577 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
578 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
579 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
580 //                              LOGGER.info(errRes);
581 //                              throw new DMaaPAccessDeniedException(errRes);
582 //                      }
583 //              }
584
585                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
586
587                 if (null == topic) {
588                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
589                                         + "] does not exist.");
590                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
591                                         + "] for topic. Topic [" + topicName + "] does not exist.");
592                 }
593
594                 topic.permitReadsByUser(consumerId, user);
595
596                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
597                                 + "]. Sending response.");
598                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
599                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
600         }
601
602         /**
603          * @param dmaapContext
604          * @param topicName
605          * @param consumerId
606          * @throws DMaaPAccessDeniedException 
607          */
608         @Override
609         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
610                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
611
612                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
613                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
614 //              if (user == null) {
615 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
616 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
617 //                      String permission = aaf.aafPermissionString(topicName, "manage");
618 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
619 //                      {
620 //                              LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
621 //                                              + ". Authentication failed.");
622 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
623 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
624 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
625 //                              LOGGER.info(errRes);
626 //                              throw new DMaaPAccessDeniedException(errRes);
627 //                      }
628 //                      
629 //                      
630 //              }
631
632                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
633
634                 if (null == topic) {
635                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
636                                         + "] does not exist.");
637                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
638                                         + "] for topic. Topic [" + topicName + "] does not exist.");
639                 }
640
641                 topic.denyReadsByUser(consumerId, user);
642
643                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
644                                 + "]. Sending response.");
645                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
646                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
647
648         }
649
650
651         
652         
653         
654 }