DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / TopicServiceImpl.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8  * =================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *  
21  */
22 package org.onap.dmaap.dmf.mr.service.impl;
23
24 import com.att.ajsc.beans.PropertiesMapBean;
25 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import com.att.nsa.configs.ConfigDbException;
29 import com.att.nsa.security.NsaAcl;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import joptsimple.internal.Strings;
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.lang.math.NumberUtils;
35 import org.apache.http.HttpStatus;
36 import org.json.JSONArray;
37 import org.json.JSONException;
38 import org.json.JSONObject;
39 import org.onap.dmaap.dmf.mr.CambriaApiException;
40 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
41 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
42 import org.onap.dmaap.dmf.mr.beans.TopicBean;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
47 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
49 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
50 import org.onap.dmaap.dmf.mr.metabroker.Topic;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
54 import org.onap.dmaap.dmf.mr.service.TopicService;
55 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
56 import org.onap.dmaap.dmf.mr.utils.Utils;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
59
60 import javax.servlet.http.HttpServletRequest;
61 import java.io.IOException;
62 import java.security.Principal;
63
64 /**
65  * @author muzainulhaque.qazi
66  *
67  */
68 @Service
69 public class TopicServiceImpl implements TopicService {
70
71         private static final String TOPIC_CREATE_OP = "create";
72         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
73         @Autowired
74         private DMaaPErrorMessages errorMessages;
75
76         public DMaaPErrorMessages getErrorMessages() {
77                 return errorMessages;
78         }
79
80         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
81                 this.errorMessages = errorMessages;
82         }
83
84
85   String getPropertyFromAJSCbean(String propertyKey) {
86                 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
87         }
88
89         String getPropertyFromAJSCmap(String propertyKey) {
90                 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
91         }
92
93         NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
94                 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
95         }
96
97         void respondOk(DMaaPContext context, String msg) {
98                 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
99         }
100
101         void respondOk(DMaaPContext context, JSONObject json) throws IOException {
102                 DMaaPResponseBuilder.respondOk(context, json);
103         }
104
105         boolean isCadiEnabled() {
106                 return Utils.isCadiEnabled();
107         }
108         /**
109          * @param dmaapContext
110          * @throws JSONException
111          * @throws ConfigDbException
112          * @throws IOException
113          * 
114          */
115         @Override
116         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
117                 LOGGER.info("Fetching list of all the topics.");
118                 JSONObject json = new JSONObject();
119
120                 JSONArray topicsList = new JSONArray();
121
122                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
123                         topicsList.put(topic.getName());
124                 }
125
126                 json.put("topics", topicsList);
127
128                 LOGGER.info("Returning list of all the topics.");
129                 respondOk(dmaapContext, json);
130
131         }
132
133         /**
134          * @param dmaapContext
135          * @throws JSONException
136          * @throws ConfigDbException
137          * @throws IOException
138          * 
139          */
140         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
141
142                 LOGGER.info("Fetching list of all the topics.");
143                 JSONObject json = new JSONObject();
144
145                 JSONArray topicsList = new JSONArray();
146
147                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
148                         JSONObject obj = new JSONObject();
149                         obj.put("topicName", topic.getName());
150                         
151                         obj.put("owner", topic.getOwner());
152                         obj.put("txenabled", topic.isTransactionEnabled());
153                         topicsList.put(obj);
154                 }
155
156                 json.put("topics", topicsList);
157
158                 LOGGER.info("Returning list of all the topics.");
159                 respondOk(dmaapContext, json);
160
161         }
162
163         /**
164          * @param dmaapContext
165          * @param topicName
166          * @throws ConfigDbException
167          * @throws IOException
168          * @throws TopicExistsException
169          */
170         @Override
171         public void getTopic(DMaaPContext dmaapContext, String topicName)
172                         throws ConfigDbException, IOException, TopicExistsException {
173
174                 LOGGER.info("Fetching details of topic " + topicName);
175                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
176
177                 if (null == t) {
178                         LOGGER.error("Topic [" + topicName + "] does not exist.");
179                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
180                 }
181
182                 JSONObject o = new JSONObject();
183                 o.put("name", t.getName());
184                 o.put("description", t.getDescription());
185
186                 if (null != t.getOwners())
187                         o.put("owner", t.getOwners().iterator().next());
188                 if (null != t.getReaderAcl())
189                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
190                 if (null != t.getWriterAcl())
191                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
192
193                 LOGGER.info("Returning details of topic " + topicName);
194                 respondOk(dmaapContext, o);
195
196         }
197
198         /**
199          * @param dmaapContext
200          * @param topicBean
201          * @throws CambriaApiException
202          * @throws AccessDeniedException
203          * @throws IOException
204          * @throws TopicExistsException
205          * @throws JSONException
206          * 
207          * 
208          * 
209          */
210         @Override
211         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
212                 String topicName = topicBean.getTopicName();
213                 LOGGER.info("Creating topic {}",topicName);
214                 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
215
216                 try {
217                         final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
218                         final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
219
220                         final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
221                                 key, partitions, replicas, topicBean.isTransactionEnabled());
222
223                         LOGGER.info("Topic {} created successfully. Sending response", topicName);
224                         respondOk(dmaapContext, topicToJson(t));
225                 } catch (JSONException ex) {
226
227                         LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
228                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
229                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
230                         LOGGER.info(errRes.toString());
231                         throw new CambriaApiException(errRes);
232
233                 } catch (ConfigDbException ex) {
234
235                         LOGGER.error("Failed to create topic "+ topicName +".  Config DB Exception", ex);
236                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
237                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
238                         LOGGER.info(errRes.toString());
239                         throw new CambriaApiException(errRes);
240                 } catch (Broker1.TopicExistsException ex) {
241                         LOGGER.error( "Failed to create topic "+ topicName +".  Topic already exists.",ex);
242                 }
243         }
244
245         private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
246                 String clientId = Strings.EMPTY;
247                 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
248                         LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
249                         String permission = buildPermission(topicName, operation);
250                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
251                         clientId = getAAFclientId(dmaapContext.getRequest());
252
253                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
254                                 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
255                                         operation, topicName, clientId, permission);
256                                 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
257                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
258                                         "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
259                         }
260                 } else if(operation.equals(TOPIC_CREATE_OP)){
261                         final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
262                         clientId = (user != null) ? user.getKey() : Strings.EMPTY;
263                 }
264                 return clientId;
265         }
266
267         private String getAAFclientId(HttpServletRequest request) {
268                 Principal principal = request.getUserPrincipal();
269                 if (principal !=null) {
270                         return principal.getName();
271                 } else {
272                         LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
273                         return null;
274                 }
275         }
276
277         private boolean isTopicWithEnforcedAuthorization(String topicName) {
278                 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
279                 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
280         }
281
282         int getValueOrDefault(int value, String defaultProperty) {
283                 int returnValue = value;
284                 if (returnValue <= 0) {
285                         String defaultValue = getPropertyFromAJSCmap(defaultProperty);
286                         returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
287                         returnValue = (returnValue <= 0) ? 1 : returnValue;
288                 }
289                 return returnValue;
290         }
291
292         private String buildPermission(String topicName, String operation) {
293                 String nameSpace = (topicName.indexOf('.') > 1) ?
294                         topicName.substring(0, topicName.lastIndexOf('.')) : "";
295
296                 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
297                 return mrFactoryValue + nameSpace + "|" + operation;
298         }
299
300         /**
301          * @param dmaapContext
302          * @param topicName
303          * @throws ConfigDbException
304          * @throws IOException
305          * @throws TopicExistsException
306          * @throws CambriaApiException
307          * @throws AccessDeniedException
308          */
309         @Override
310         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
311                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
312
313                 LOGGER.info(" Deleting topic " + topicName);
314                 authorizeClient(dmaapContext, topicName, "destroy");
315
316                 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
317                 if (topic == null) {
318                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
319                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
320                 }
321
322                 // metabroker.deleteTopic(topicName);
323
324                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
325                 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
326         }
327
328         /**
329          * 
330          * @param dmaapContext
331          * @return
332          */
333         DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
334                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
335         }
336
337         /**
338          * @param dmaapContext
339          * @param topicName
340          * @throws ConfigDbException
341          * @throws IOException
342          * @throws TopicExistsException
343          * 
344          */
345         @Override
346         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
347                         throws ConfigDbException, IOException, TopicExistsException {
348                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
349                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
350
351                 if (topic == null) {
352                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
353                         throw new TopicExistsException(
354                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
355                 }
356
357                 final NsaAcl acl = topic.getWriterAcl();
358
359                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
360                 respondOk(dmaapContext, aclToJson(acl));
361
362         }
363
364         /**
365          * 
366          * @param acl
367          * @return
368          */
369         private static JSONObject aclToJson(NsaAcl acl) {
370                 final JSONObject o = new JSONObject();
371                 if (acl == null) {
372                         o.put("enabled", false);
373                         o.put("users", new JSONArray());
374                 } else {
375                         o.put("enabled", acl.isActive());
376
377                         final JSONArray a = new JSONArray();
378                         for (String user : acl.getUsers()) {
379                                 a.put(user);
380                         }
381                         o.put("users", a);
382                 }
383                 return o;
384         }
385
386         /**
387          * @param dmaapContext
388          * @param topicName
389          */
390         @Override
391         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
392                         throws IOException, ConfigDbException, TopicExistsException {
393                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
394                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
395
396                 if (topic == null) {
397                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
398                         throw new TopicExistsException(
399                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
400                 }
401
402                 final NsaAcl acl = topic.getReaderAcl();
403
404                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
405                 respondOk(dmaapContext, aclToJson(acl));
406
407         }
408
409         /**
410          * 
411          * @param t
412          * @return
413          */
414         static JSONObject topicToJson(Topic t) {
415                 final JSONObject o = new JSONObject();
416
417                 o.put("name", t.getName());
418                 o.put("description", t.getDescription());
419                 o.put("owner", t.getOwner());
420                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
421                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
422
423                 return o;
424         }
425
426         /**
427          * @param dmaapContext
428          *                      @param topicName @param producerId @throws
429          *            ConfigDbException @throws IOException @throws
430          *            TopicExistsException @throws AccessDeniedException @throws
431          * 
432          */
433         @Override
434         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
435                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
436
437                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
438                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
439
440                 
441                 //
442                 // LOGGER.info("Authenticating the user, as ACL authentication is not
443                 
444                 //// String permission =
445                 
446                 //
447                 
448                 
449                 
450                 // {
451                 // LOGGER.error("Failed to permit write access to producer [" +
452                 // producerId + "] for topic " + topicName
453                 
454                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
455                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
456                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
457                 
458                 
459                 
460                 // }
461                 // }
462
463                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
464
465                 if (null == topic) {
466                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
467                                         + "] does not exist.");
468                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
469                                         + "] for topic. Topic [" + topicName + "] does not exist.");
470                 }
471
472                 topic.permitWritesFromUser(producerId, user);
473
474                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
475                                 + "]. Sending response.");
476                 respondOk(dmaapContext, "Write access has been granted to publisher.");
477
478         }
479
480         /**
481          * @param dmaapContext
482          * @param topicName
483          * @param producerId
484          * @throws ConfigDbException
485          * @throws IOException
486          * @throws TopicExistsException
487          * @throws AccessDeniedException
488          * @throws DMaaPAccessDeniedException
489          * 
490          */
491         @Override
492         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
493                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
494                         DMaaPAccessDeniedException {
495
496                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
497                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
498                 
499                 //
500                 //// String permission =
501                 
502                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
503                 // String permission = aaf.aafPermissionString(topicName, "manage");
504                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
505                 // {
506                 // LOGGER.error("Failed to revoke write access to producer [" +
507                 // producerId + "] for topic " + topicName
508                 
509                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
510                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
511                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
512                 
513                 
514                 // throw new DMaaPAccessDeniedException(errRes);
515                 //
516         
517                 // }
518
519                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
520
521                 if (null == topic) {
522                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
523                                         + "] does not exist.");
524                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
525                                         + "] for topic. Topic [" + topicName + "] does not exist.");
526                 }
527
528                 topic.denyWritesFromUser(producerId, user);
529
530                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
531                                 + "]. Sending response.");
532                 respondOk(dmaapContext, "Write access has been revoked for publisher.");
533
534         }
535
536         /**
537          * @param dmaapContext
538          * @param topicName
539          * @param consumerId
540          * @throws DMaaPAccessDeniedException
541          */
542         @Override
543         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
544                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
545                         DMaaPAccessDeniedException {
546
547                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
548                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
549                 
550                 //
551                 //// String permission =
552                 
553                 
554                 // String permission = aaf.aafPermissionString(topicName, "manage");
555                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
556                 // {
557                 // LOGGER.error("Failed to permit read access to consumer [" +
558                 // consumerId + "] for topic " + topicName
559                 
560                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
561                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
562                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
563                 
564                 
565                 
566                 // }
567                 // }
568
569                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
570
571                 if (null == topic) {
572                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
573                                         + "] does not exist.");
574                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
575                                         + "] for topic. Topic [" + topicName + "] does not exist.");
576                 }
577
578                 topic.permitReadsByUser(consumerId, user);
579
580                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
581                                 + "]. Sending response.");
582                 respondOk(dmaapContext,
583                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
584         }
585
586         /**
587          * @param dmaapContext
588          * @param topicName
589          * @param consumerId
590          * @throws DMaaPAccessDeniedException
591          */
592         @Override
593         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
594                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
595                         DMaaPAccessDeniedException {
596
597                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
598                 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
599                 
600                 //// String permission =
601                 
602                 
603                 // String permission = aaf.aafPermissionString(topicName, "manage");
604                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
605                 // {
606                 // LOGGER.error("Failed to revoke read access to consumer [" +
607                 // consumerId + "] for topic " + topicName
608                 
609                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
610                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
611                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
612                 
613                 
614                 // throw new DMaaPAccessDeniedException(errRes);
615                 // }
616                 //
617                 //
618         
619                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
620
621                 if (null == topic) {
622                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
623                                         + "] does not exist.");
624                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
625                                         + "] for topic. Topic [" + topicName + "] does not exist.");
626                 }
627
628                 topic.denyReadsByUser(consumerId, user);
629
630                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
631                                 + "]. Sending response.");
632                 respondOk(dmaapContext,
633                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
634
635         }
636
637 }