commiting code for test coverage
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / service / impl / TopicServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 /**
23  * 
24  */
25 package com.att.nsa.cambria.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
37
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
60
61 /**
62  * @author author
63  *
64  */
65 @Service
66 public class TopicServiceImpl implements TopicService {
67
68         //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
70         @Autowired
71         private DMaaPErrorMessages errorMessages;
72         
73         //@Value("${msgRtr.topicfactory.aaf}")
74         //private String mrFactory;
75         
76         
77         /**
78          * @param dmaapContext
79          * @throws JSONException
80          * @throws ConfigDbException
81          * @throws IOException
82          * 
83          */
84         @Override
85         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
86
87                 LOGGER.info("Fetching list of all the topics.");
88                 JSONObject json = new JSONObject();
89
90                 JSONArray topicsList = new JSONArray();
91
92                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
93                         topicsList.put(topic.getName());
94                 }
95
96                 json.put("topics", topicsList);
97
98                 LOGGER.info("Returning list of all the topics.");
99                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
100
101         }
102
103         /**
104          * @param dmaapContext
105          * @throws JSONException
106          * @throws ConfigDbException
107          * @throws IOException
108          * 
109          */
110         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
111
112                 LOGGER.info("Fetching list of all the topics.");
113                 JSONObject json = new JSONObject();
114
115                 JSONArray topicsList = new JSONArray();
116
117                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
118                         JSONObject obj = new JSONObject();
119                         obj.put("topicName", topic.getName());
120                         //obj.put("description", topic.getDescription());
121                         obj.put("owner", topic.getOwner());
122                         obj.put("txenabled", topic.isTransactionEnabled());
123                         topicsList.put(obj);
124                 }
125
126                 json.put("topics", topicsList);
127
128                 LOGGER.info("Returning list of all the topics.");
129                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
130
131         }
132
133         
134         /**
135          * @param dmaapContext
136          * @param topicName
137          * @throws ConfigDbException
138          * @throws IOException
139          * @throws TopicExistsException
140          */
141         @Override
142         public void getTopic(DMaaPContext dmaapContext, String topicName)
143                         throws ConfigDbException, IOException, TopicExistsException {
144
145                 LOGGER.info("Fetching details of topic " + topicName);
146                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
147
148                 if (null == t) {
149                         LOGGER.error("Topic [" + topicName + "] does not exist.");
150                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
151                 }
152
153                 JSONObject o = new JSONObject();
154                 o.put ( "name", t.getName () );
155                 o.put ( "description", t.getDescription () );
156                 
157                 if (null!=t.getOwners ())
158                 o.put ( "owner", t.getOwners ().iterator ().next () );
159                 if(null!=t.getReaderAcl ())
160                 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
161                 if(null!=t.getWriterAcl ())
162                 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
163         
164                 LOGGER.info("Returning details of topic " + topicName);
165                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
166
167         }
168
169         
170         /**
171          * @param dmaapContext
172          * @param topicBean
173          * @throws CambriaApiException
174          * @throws AccessDeniedException
175          * @throws IOException
176          * @throws TopicExistsException
177          * @throws JSONException
178          * 
179          * 
180          * 
181          */
182         @Override
183         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
184                         throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
185
186                 LOGGER.info("Creating topic " + topicBean.getTopicName());
187                 
188                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
189                 String key = null;
190                 String appName=dmaapContext.getRequest().getHeader("AppName");
191                 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
192         
193                 if(user != null)
194                 {
195                         key = user.getKey();
196                         
197                         if(  enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
198                                 
199                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
200                                 
201                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
202                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
203                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
204                                 LOGGER.info(errRes.toString());
205                                 throw new DMaaPAccessDeniedException(errRes);
206                                 
207                         }
208                 }
209                                 
210                 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
211                         else if (user == null &&  null==dmaapContext.getRequest().getHeader("Authorization")     && 
212                                          (null == appName  &&  null == dmaapContext.getRequest().getHeader("cookie"))) {
213                         LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
214                         
215                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
216                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
217                                         errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
218                         LOGGER.info(errRes.toString());
219                         throw new DMaaPAccessDeniedException(errRes);
220                 }
221                 
222                 if (user == null &&  (null!=dmaapContext.getRequest().getHeader("Authorization") ||
223                                          null != dmaapContext.getRequest().getHeader("cookie"))) {
224                         //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
225                          // ACL authentication is not provided so we will use the aaf authentication
226                         LOGGER.info("Authorization the topic");
227                 
228                         String permission = "";
229                         String nameSpace="";
230                         if(topicBean.getTopicName().indexOf(".")>1)
231                          nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
232                 
233                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
234                 
235                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
236                         
237                         permission = mrFactoryVal+nameSpace+"|create";
238                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
239                         
240                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
241                         {
242                                 
243                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
244                                 
245                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
246                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
247                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
248                                 LOGGER.info(errRes.toString());
249                                 throw new DMaaPAccessDeniedException(errRes);
250                                 
251                         }else{
252                                 // if user is null and aaf authentication is ok then key should be ""
253                                 //key = "";
254                                 /**
255                                  * Added as part of AAF user it should return username
256                                  */
257                                 
258                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
259                                 LOGGER.info("key ==================== "+key);
260                                 
261                         }
262                 }
263
264                 try {
265                         final String topicName = topicBean.getTopicName();
266                         final String desc = topicBean.getTopicDescription();
267
268                         final  int partitions = topicBean.getPartitionCount();
269                 
270                         final int replicas = topicBean.getReplicationCount();
271                         boolean transactionEnabled = topicBean.isTransactionEnabled();
272                         
273
274                         final Broker metabroker = getMetaBroker(dmaapContext);
275                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
276                                         transactionEnabled);
277
278                         LOGGER.info("Topic created successfully. Sending response");
279                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
280                 } catch (JSONException excp) {
281                         
282                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
283                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, 
284                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), 
285                                         errorMessages.getIncorrectJson());
286                         LOGGER.info(errRes.toString());
287                         throw new CambriaApiException(errRes);
288                         
289                 }
290         }
291
292         /**
293          * @param dmaapContext
294          * @param topicName
295          * @throws ConfigDbException
296          * @throws IOException
297          * @throws TopicExistsException
298          * @throws CambriaApiException
299          * @throws AccessDeniedException
300          */
301         @Override
302         public void deleteTopic(DMaaPContext dmaapContext, String topicName)
303                         throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
304
305                 LOGGER.info("Deleting topic " + topicName);
306                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
307
308                 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
309                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
310 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
311                         String permission = "";
312                         String nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
313                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
314 //                      String tokens[] = topicName.split(".mr.topic.");
315                         permission = mrFactoryVal+nameSpace+"|destroy";
316                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
317                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
318                         {
319                                 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
320                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
321                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
322                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
323                                 LOGGER.info(errRes.toString());
324                                 throw new DMaaPAccessDeniedException(errRes);
325                         }
326                         
327                         
328                 }
329
330                 final Broker metabroker = getMetaBroker(dmaapContext);
331                 final Topic topic = metabroker.getTopic(topicName);
332
333                 if (topic == null) {
334                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
335                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
336                 }
337
338                 metabroker.deleteTopic(topicName);
339
340                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
341                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
342
343         }
344
345         /**
346          * 
347          * @param dmaapContext
348          * @return
349          */
350         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
351                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
352         }
353
354         /**
355          * @param dmaapContext
356          * @param topicName
357          * @throws ConfigDbException
358          * @throws IOException
359          * @throws TopicExistsException
360          * 
361          */
362         @Override
363         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
364                         throws ConfigDbException, IOException, TopicExistsException {
365                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
366                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
367
368                 if (topic == null) {
369                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
370                         throw new TopicExistsException(
371                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
372                 }
373                 
374                 
375
376                 final NsaAcl acl = topic.getWriterAcl();
377
378                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
379                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
380
381         }
382
383         /**
384          * 
385          * @param acl
386          * @return
387          */
388         private static JSONObject aclToJson(NsaAcl acl) {
389                 final JSONObject o = new JSONObject();
390                 if (acl == null) {
391                         o.put("enabled", false);
392                         o.put("users", new JSONArray());
393                 } else {
394                         o.put("enabled", acl.isActive());
395
396                         final JSONArray a = new JSONArray();
397                         for (String user : acl.getUsers()) {
398                                 a.put(user);
399                         }
400                         o.put("users", a);
401                 }
402                 return o;
403         }
404
405         /**
406          * @param dmaapContext
407          * @param topicName
408          */
409         @Override
410         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
411                         throws IOException, ConfigDbException, TopicExistsException {
412                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
413                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
414
415                 if (topic == null) {
416                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
417                         throw new TopicExistsException(
418                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
419                 }
420
421                 final NsaAcl acl = topic.getReaderAcl();
422
423                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
424                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
425
426         }
427
428         /**
429          * 
430          * @param t
431          * @return
432          */
433         private static JSONObject topicToJson(Topic t) {
434                 final JSONObject o = new JSONObject();
435
436                 o.put("name", t.getName());
437                 o.put("description", t.getDescription());
438                 o.put("owner", t.getOwner());
439                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
440                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
441
442                 return o;
443         }
444
445         /**
446          * @param dmaapContext
447          * @param topicName
448          * @param producerId
449          * @throws ConfigDbException
450          * @throws IOException
451          * @throws TopicExistsException
452          * @throws AccessDeniedException
453          * @throws  
454          * 
455          */
456         @Override
457         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
458                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
459
460                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
461                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
462                 
463 //              if (user == null) {
464 //                      
465 //                      LOGGER.info("Authenticating the user, as ACL authentication is not provided");
466 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
467 //                      
468 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
469 //                      String permission = aaf.aafPermissionString(topicName, "manage");
470 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
471 //                      {
472 //                              LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
473 //                                                                      + ". Authentication failed.");
474 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
475 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
476 //                                              errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
477 //                              LOGGER.info(errRes);
478 //                              throw new DMaaPAccessDeniedException(errRes);
479 //                      }
480 //              }
481
482                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
483
484                 if (null == topic) {
485                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
486                                         + "] does not exist.");
487                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
488                                         + "] for topic. Topic [" + topicName + "] does not exist.");
489                 }
490
491                 topic.permitWritesFromUser(producerId, user);
492
493                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
494                                 + "]. Sending response.");
495                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
496
497         }
498
499         /**
500          * @param dmaapContext
501          * @param topicName
502          * @param producerId
503          * @throws ConfigDbException
504          * @throws IOException
505          * @throws TopicExistsException
506          * @throws AccessDeniedException
507          * @throws DMaaPAccessDeniedException 
508          * 
509          */
510         @Override
511         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
512                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
513
514                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
515                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
516 //              if (user == null) {
517 //                      
518 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
519 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
520 //                      String permission = aaf.aafPermissionString(topicName, "manage");
521 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
522 //                      {
523 //                              LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
524 //                                              + ". Authentication failed.");
525 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
526 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
527 //                                              errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
528 //                              LOGGER.info(errRes);
529 //                              throw new DMaaPAccessDeniedException(errRes);
530 //                              
531 //                      }
532 //              }
533
534                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
535
536                 if (null == topic) {
537                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
538                                         + "] does not exist.");
539                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
540                                         + "] for topic. Topic [" + topicName + "] does not exist.");
541                 }
542
543                 topic.denyWritesFromUser(producerId, user);
544
545                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
546                                 + "]. Sending response.");
547                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
548
549         }
550
551         /**
552          * @param dmaapContext
553          * @param topicName
554          * @param consumerId
555          * @throws DMaaPAccessDeniedException 
556          */
557         @Override
558         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
559                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
560
561                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
562                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
563 //              if (user == null) {
564 //                      
565 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
566 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
567 //                      String permission = aaf.aafPermissionString(topicName, "manage");
568 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
569 //                      {
570 //                              LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
571 //                                              + ". Authentication failed.");
572 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
573 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
574 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
575 //                              LOGGER.info(errRes);
576 //                              throw new DMaaPAccessDeniedException(errRes);
577 //                      }
578 //              }
579
580                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
581
582                 if (null == topic) {
583                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
584                                         + "] does not exist.");
585                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
586                                         + "] for topic. Topic [" + topicName + "] does not exist.");
587                 }
588
589                 topic.permitReadsByUser(consumerId, user);
590
591                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
592                                 + "]. Sending response.");
593                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
594                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
595         }
596
597         /**
598          * @param dmaapContext
599          * @param topicName
600          * @param consumerId
601          * @throws DMaaPAccessDeniedException 
602          */
603         @Override
604         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
605                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
606
607                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
608                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
609 //              if (user == null) {
610 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
611 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
612 //                      String permission = aaf.aafPermissionString(topicName, "manage");
613 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
614 //                      {
615 //                              LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
616 //                                              + ". Authentication failed.");
617 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
618 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
619 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
620 //                              LOGGER.info(errRes);
621 //                              throw new DMaaPAccessDeniedException(errRes);
622 //                      }
623 //                      
624 //                      
625 //              }
626
627                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
628
629                 if (null == topic) {
630                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
631                                         + "] does not exist.");
632                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
633                                         + "] for topic. Topic [" + topicName + "] does not exist.");
634                 }
635
636                 topic.denyReadsByUser(consumerId, user);
637
638                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
639                                 + "]. Sending response.");
640                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
641                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
642
643         }
644
645
646         
647         
648         
649 }