TopicService authorization check refactor
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / TopicServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8  * =================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *  
21  */
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.beans.PropertiesMapBean;
25 import java.io.IOException;
26
27 import java.security.Principal;
28 import javax.servlet.http.HttpServletRequest;
29 import joptsimple.internal.Strings;
30 import org.apache.commons.lang.StringUtils;
31 import org.apache.commons.lang.math.NumberUtils;
32 import org.apache.http.HttpStatus;
33 import org.json.JSONArray;
34 import org.json.JSONException;
35 import org.json.JSONObject;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Service;
38
39 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
40 import org.onap.dmaap.dmf.mr.CambriaApiException;
41 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
43 import org.onap.dmaap.dmf.mr.beans.TopicBean;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
48 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
49 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
51
52 import org.onap.dmaap.dmf.mr.metabroker.Topic;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
54 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
55 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
56 import org.onap.dmaap.dmf.mr.service.TopicService;
57 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
58 import org.onap.dmaap.dmf.mr.utils.Utils;
59 import com.att.eelf.configuration.EELFLogger;
60 import com.att.eelf.configuration.EELFManager;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.NsaAcl;
63 import com.att.nsa.security.NsaApiKey;
64 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
65
66 /**
67  * @author muzainulhaque.qazi
68  *
69  */
70 @Service
71 public class TopicServiceImpl implements TopicService {
72
73         private static final String TOPIC_CREATE_OP = "create";
74         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
75         @Autowired
76         private DMaaPErrorMessages errorMessages;
77
78         public DMaaPErrorMessages getErrorMessages() {
79                 return errorMessages;
80         }
81
82         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83                 this.errorMessages = errorMessages;
84         }
85
86
87   String getPropertyFromAJSCbean(String propertyKey) {
88                 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
89         }
90
91         String getPropertyFromAJSCmap(String propertyKey) {
92                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
93         }
94
95         NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
96                 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
97         }
98
99         void respondOk(DMaaPContext context, String msg) {
100                 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
101         }
102
103         void respondOk(DMaaPContext context, JSONObject json) throws IOException {
104                 DMaaPResponseBuilder.respondOk(context, json);
105         }
106
107         boolean isCadiEnabled() {
108                 return Utils.isCadiEnabled();
109         }
110         /**
111          * @param dmaapContext
112          * @throws JSONException
113          * @throws ConfigDbException
114          * @throws IOException
115          * 
116          */
117         @Override
118         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119                 LOGGER.info("Fetching list of all the topics.");
120                 JSONObject json = new JSONObject();
121
122                 JSONArray topicsList = new JSONArray();
123
124                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
125                         topicsList.put(topic.getName());
126                 }
127
128                 json.put("topics", topicsList);
129
130                 LOGGER.info("Returning list of all the topics.");
131                 respondOk(dmaapContext, json);
132
133         }
134
135         /**
136          * @param dmaapContext
137          * @throws JSONException
138          * @throws ConfigDbException
139          * @throws IOException
140          * 
141          */
142         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
143
144                 LOGGER.info("Fetching list of all the topics.");
145                 JSONObject json = new JSONObject();
146
147                 JSONArray topicsList = new JSONArray();
148
149                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
150                         JSONObject obj = new JSONObject();
151                         obj.put("topicName", topic.getName());
152                         
153                         obj.put("owner", topic.getOwner());
154                         obj.put("txenabled", topic.isTransactionEnabled());
155                         topicsList.put(obj);
156                 }
157
158                 json.put("topics", topicsList);
159
160                 LOGGER.info("Returning list of all the topics.");
161                 respondOk(dmaapContext, json);
162
163         }
164
165         /**
166          * @param dmaapContext
167          * @param topicName
168          * @throws ConfigDbException
169          * @throws IOException
170          * @throws TopicExistsException
171          */
172         @Override
173         public void getTopic(DMaaPContext dmaapContext, String topicName)
174                         throws ConfigDbException, IOException, TopicExistsException {
175
176                 LOGGER.info("Fetching details of topic " + topicName);
177                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
178
179                 if (null == t) {
180                         LOGGER.error("Topic [" + topicName + "] does not exist.");
181                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
182                 }
183
184                 JSONObject o = new JSONObject();
185                 o.put("name", t.getName());
186                 o.put("description", t.getDescription());
187
188                 if (null != t.getOwners())
189                         o.put("owner", t.getOwners().iterator().next());
190                 if (null != t.getReaderAcl())
191                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
192                 if (null != t.getWriterAcl())
193                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
194
195                 LOGGER.info("Returning details of topic " + topicName);
196                 respondOk(dmaapContext, o);
197
198         }
199
200         /**
201          * @param dmaapContext
202          * @param topicBean
203          * @throws CambriaApiException
204          * @throws AccessDeniedException
205          * @throws IOException
206          * @throws TopicExistsException
207          * @throws JSONException
208          * 
209          * 
210          * 
211          */
212         @Override
213         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
214                 String topicName = topicBean.getTopicName();
215                 LOGGER.info("Creating topic {}",topicName);
216                 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
217
218                 try {
219                         final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
220                         final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
221
222                         final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
223                                 key, partitions, replicas, topicBean.isTransactionEnabled());
224
225                         LOGGER.info("Topic {} created successfully. Sending response", topicName);
226                         respondOk(dmaapContext, topicToJson(t));
227                 } catch (JSONException ex) {
228
229                         LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
230                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
231                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
232                         LOGGER.info(errRes.toString());
233                         throw new CambriaApiException(errRes);
234
235                 } catch (ConfigDbException ex) {
236
237                         LOGGER.error("Failed to create topic "+ topicName +".  Config DB Exception", ex);
238                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
239                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
240                         LOGGER.info(errRes.toString());
241                         throw new CambriaApiException(errRes);
242                 } catch (Broker1.TopicExistsException ex) {
243                         LOGGER.error( "Failed to create topic "+ topicName +".  Topic already exists.",ex);
244                 }
245         }
246
247         private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
248                 String clientId = Strings.EMPTY;
249                 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
250                         LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
251                         String permission = buildPermission(topicName, operation);
252                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
253                         clientId = getAAFclientId(dmaapContext.getRequest());
254
255                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
256                                 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
257                                         operation, topicName, clientId, permission);
258                                 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
259                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
260                                         "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
261                         }
262                 } else if(operation.equals(TOPIC_CREATE_OP)){
263                         final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
264                         clientId = (user != null) ? user.getKey() : Strings.EMPTY;
265                 }
266                 return clientId;
267         }
268
269         private String getAAFclientId(HttpServletRequest request) {
270                 Principal principal = request.getUserPrincipal();
271                 if (principal !=null) {
272                         return principal.getName();
273                 } else {
274                         LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
275                         return null;
276                 }
277         }
278
279         private boolean isTopicWithEnforcedAuthorization(String topicName) {
280                 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
281                 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
282         }
283
284         int getValueOrDefault(int value, String defaultProperty) {
285                 int returnValue = value;
286                 if (returnValue <= 0) {
287                         String defaultValue = getPropertyFromAJSCmap(defaultProperty);
288                         returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
289                         returnValue = (returnValue <= 0) ? 1 : returnValue;
290                 }
291                 return returnValue;
292         }
293
294         private String buildPermission(String topicName, String operation) {
295                 String nameSpace = (topicName.indexOf('.') > 1) ?
296                         topicName.substring(0, topicName.lastIndexOf('.')) : "";
297
298                 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
299                 return mrFactoryValue + nameSpace + "|" + operation;
300         }
301
302         /**
303          * @param dmaapContext
304          * @param topicName
305          * @throws ConfigDbException
306          * @throws IOException
307          * @throws TopicExistsException
308          * @throws CambriaApiException
309          * @throws AccessDeniedException
310          */
311         @Override
312         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
313                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
314
315                 LOGGER.info(" Deleting topic " + topicName);
316                 authorizeClient(dmaapContext, topicName, "destroy");
317
318                 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
319                 if (topic == null) {
320                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
321                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
322                 }
323
324                 // metabroker.deleteTopic(topicName);
325
326                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
327                 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
328         }
329
330         /**
331          * 
332          * @param dmaapContext
333          * @return
334          */
335         DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
336                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
337         }
338
339         /**
340          * @param dmaapContext
341          * @param topicName
342          * @throws ConfigDbException
343          * @throws IOException
344          * @throws TopicExistsException
345          * 
346          */
347         @Override
348         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
349                         throws ConfigDbException, IOException, TopicExistsException {
350                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
351                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
352
353                 if (topic == null) {
354                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
355                         throw new TopicExistsException(
356                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
357                 }
358
359                 final NsaAcl acl = topic.getWriterAcl();
360
361                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
362                 respondOk(dmaapContext, aclToJson(acl));
363
364         }
365
366         /**
367          * 
368          * @param acl
369          * @return
370          */
371         private static JSONObject aclToJson(NsaAcl acl) {
372                 final JSONObject o = new JSONObject();
373                 if (acl == null) {
374                         o.put("enabled", false);
375                         o.put("users", new JSONArray());
376                 } else {
377                         o.put("enabled", acl.isActive());
378
379                         final JSONArray a = new JSONArray();
380                         for (String user : acl.getUsers()) {
381                                 a.put(user);
382                         }
383                         o.put("users", a);
384                 }
385                 return o;
386         }
387
388         /**
389          * @param dmaapContext
390          * @param topicName
391          */
392         @Override
393         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
394                         throws IOException, ConfigDbException, TopicExistsException {
395                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
396                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
397
398                 if (topic == null) {
399                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
400                         throw new TopicExistsException(
401                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
402                 }
403
404                 final NsaAcl acl = topic.getReaderAcl();
405
406                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
407                 respondOk(dmaapContext, aclToJson(acl));
408
409         }
410
411         /**
412          * 
413          * @param t
414          * @return
415          */
416         static JSONObject topicToJson(Topic t) {
417                 final JSONObject o = new JSONObject();
418
419                 o.put("name", t.getName());
420                 o.put("description", t.getDescription());
421                 o.put("owner", t.getOwner());
422                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
423                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
424
425                 return o;
426         }
427
428         /**
429          * @param dmaapContext
430          *                      @param topicName @param producerId @throws
431          *            ConfigDbException @throws IOException @throws
432          *            TopicExistsException @throws AccessDeniedException @throws
433          * 
434          */
435         @Override
436         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
437                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
438
439                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
440                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
441
442                 
443                 //
444                 // LOGGER.info("Authenticating the user, as ACL authentication is not
445                 
446                 //// String permission =
447                 
448                 //
449                 
450                 
451                 
452                 // {
453                 // LOGGER.error("Failed to permit write access to producer [" +
454                 // producerId + "] for topic " + topicName
455                 
456                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
457                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
458                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
459                 
460                 
461                 
462                 // }
463                 // }
464
465                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
466
467                 if (null == topic) {
468                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
469                                         + "] does not exist.");
470                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
471                                         + "] for topic. Topic [" + topicName + "] does not exist.");
472                 }
473
474                 topic.permitWritesFromUser(producerId, user);
475
476                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
477                                 + "]. Sending response.");
478                 respondOk(dmaapContext, "Write access has been granted to publisher.");
479
480         }
481
482         /**
483          * @param dmaapContext
484          * @param topicName
485          * @param producerId
486          * @throws ConfigDbException
487          * @throws IOException
488          * @throws TopicExistsException
489          * @throws AccessDeniedException
490          * @throws DMaaPAccessDeniedException
491          * 
492          */
493         @Override
494         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
495                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
496                         DMaaPAccessDeniedException {
497
498                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
499                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
500                 
501                 //
502                 //// String permission =
503                 
504                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
505                 // String permission = aaf.aafPermissionString(topicName, "manage");
506                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
507                 // {
508                 // LOGGER.error("Failed to revoke write access to producer [" +
509                 // producerId + "] for topic " + topicName
510                 
511                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
512                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
513                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
514                 
515                 
516                 // throw new DMaaPAccessDeniedException(errRes);
517                 //
518         
519                 // }
520
521                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
522
523                 if (null == topic) {
524                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
525                                         + "] does not exist.");
526                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
527                                         + "] for topic. Topic [" + topicName + "] does not exist.");
528                 }
529
530                 topic.denyWritesFromUser(producerId, user);
531
532                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
533                                 + "]. Sending response.");
534                 respondOk(dmaapContext, "Write access has been revoked for publisher.");
535
536         }
537
538         /**
539          * @param dmaapContext
540          * @param topicName
541          * @param consumerId
542          * @throws DMaaPAccessDeniedException
543          */
544         @Override
545         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
546                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
547                         DMaaPAccessDeniedException {
548
549                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
550                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
551                 
552                 //
553                 //// String permission =
554                 
555                 
556                 // String permission = aaf.aafPermissionString(topicName, "manage");
557                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
558                 // {
559                 // LOGGER.error("Failed to permit read access to consumer [" +
560                 // consumerId + "] for topic " + topicName
561                 
562                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
563                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
564                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
565                 
566                 
567                 
568                 // }
569                 // }
570
571                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
572
573                 if (null == topic) {
574                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
575                                         + "] does not exist.");
576                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
577                                         + "] for topic. Topic [" + topicName + "] does not exist.");
578                 }
579
580                 topic.permitReadsByUser(consumerId, user);
581
582                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
583                                 + "]. Sending response.");
584                 respondOk(dmaapContext,
585                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
586         }
587
588         /**
589          * @param dmaapContext
590          * @param topicName
591          * @param consumerId
592          * @throws DMaaPAccessDeniedException
593          */
594         @Override
595         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
596                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
597                         DMaaPAccessDeniedException {
598
599                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
600                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
601                 
602                 //// String permission =
603                 
604                 
605                 // String permission = aaf.aafPermissionString(topicName, "manage");
606                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
607                 // {
608                 // LOGGER.error("Failed to revoke read access to consumer [" +
609                 // consumerId + "] for topic " + topicName
610                 
611                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
612                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
613                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
614                 
615                 
616                 // throw new DMaaPAccessDeniedException(errRes);
617                 // }
618                 //
619                 //
620         
621                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
622
623                 if (null == topic) {
624                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
625                                         + "] does not exist.");
626                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
627                                         + "] for topic. Topic [" + topicName + "] does not exist.");
628                 }
629
630                 topic.denyReadsByUser(consumerId, user);
631
632                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
633                                 + "]. Sending response.");
634                 respondOk(dmaapContext,
635                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
636
637         }
638
639 }