4 /*******************************************************************************
5 * ============LICENSE_START=======================================================
7 * ================================================================================
8 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
25 package com.att.dmf.mr.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.beans.DMaaPContext;
39 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import com.att.dmf.mr.beans.TopicBean;
41 import com.att.dmf.mr.constants.CambriaConstants;
42 import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
43 import com.att.dmf.mr.exception.DMaaPErrorMessages;
44 import com.att.dmf.mr.exception.DMaaPResponseCode;
45 import com.att.dmf.mr.exception.ErrorResponse;
46 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
47 import com.att.dmf.mr.metabroker.Broker1;
48 //import com.att.dmf.mr.metabroker.Broker1;
49 import com.att.dmf.mr.metabroker.Topic;
50 import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
51 import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import com.att.dmf.mr.service.TopicService;
54 import com.att.dmf.mr.utils.DMaaPResponseBuilder;
55 import com.att.eelf.configuration.EELFLogger;
56 import com.att.eelf.configuration.EELFManager;
57 import com.att.nsa.configs.ConfigDbException;
58 import com.att.nsa.security.NsaAcl;
59 import com.att.nsa.security.NsaApiKey;
60 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
63 * @author muzainulhaque.qazi
67 public class TopicServiceImpl implements TopicService {
69 // private static final Logger LOGGER =
70 // Logger.getLogger(TopicServiceImpl.class);
71 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
73 private DMaaPErrorMessages errorMessages;
75 // @Value("${msgRtr.topicfactory.aaf}")
76 // private String mrFactory;
78 public DMaaPErrorMessages getErrorMessages() {
82 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83 this.errorMessages = errorMessages;
88 * @throws JSONException
89 * @throws ConfigDbException
94 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
95 LOGGER.info("Fetching list of all the topics.");
96 JSONObject json = new JSONObject();
98 JSONArray topicsList = new JSONArray();
100 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
101 topicsList.put(topic.getName());
104 json.put("topics", topicsList);
106 LOGGER.info("Returning list of all the topics.");
107 DMaaPResponseBuilder.respondOk(dmaapContext, json);
112 * @param dmaapContext
113 * @throws JSONException
114 * @throws ConfigDbException
115 * @throws IOException
118 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
120 LOGGER.info("Fetching list of all the topics.");
121 JSONObject json = new JSONObject();
123 JSONArray topicsList = new JSONArray();
125 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
126 JSONObject obj = new JSONObject();
127 obj.put("topicName", topic.getName());
128 // obj.put("description", topic.getDescription());
129 obj.put("owner", topic.getOwner());
130 obj.put("txenabled", topic.isTransactionEnabled());
134 json.put("topics", topicsList);
136 LOGGER.info("Returning list of all the topics.");
137 DMaaPResponseBuilder.respondOk(dmaapContext, json);
142 * @param dmaapContext
144 * @throws ConfigDbException
145 * @throws IOException
146 * @throws TopicExistsException
149 public void getTopic(DMaaPContext dmaapContext, String topicName)
150 throws ConfigDbException, IOException, TopicExistsException {
152 LOGGER.info("Fetching details of topic " + topicName);
153 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
156 LOGGER.error("Topic [" + topicName + "] does not exist.");
157 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
160 JSONObject o = new JSONObject();
161 o.put("name", t.getName());
162 o.put("description", t.getDescription());
164 if (null != t.getOwners())
165 o.put("owner", t.getOwners().iterator().next());
166 if (null != t.getReaderAcl())
167 o.put("readerAcl", aclToJson(t.getReaderAcl()));
168 if (null != t.getWriterAcl())
169 o.put("writerAcl", aclToJson(t.getWriterAcl()));
171 LOGGER.info("Returning details of topic " + topicName);
172 DMaaPResponseBuilder.respondOk(dmaapContext, o);
177 * @param dmaapContext
179 * @throws CambriaApiException
180 * @throws AccessDeniedException
181 * @throws IOException
182 * @throws TopicExistsException
183 * @throws JSONException
189 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190 throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
192 LOGGER.info("Creating topic " + topicBean.getTopicName());
194 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
196 //String appName = dmaapContext.getRequest().getHeader("AppName");
197 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
198 "enforced.topic.name.AAF");
203 if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
205 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
207 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
208 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
209 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
211 LOGGER.info(errRes.toString());
212 // throw new DMaaPAccessDeniedException(errRes);
216 // else if (user==null &&
217 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
218 // == dmaapContext.getRequest().getHeader("cookie")) ) {
219 /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
221 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
223 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
224 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
225 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
227 LOGGER.info(errRes.toString());
228 // throw new DMaaPAccessDeniedException(errRes);
231 if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization")
233 // if (user == null &&
234 // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
235 // null != dmaapContext.getRequest().getHeader("cookie"))) {
236 // ACL authentication is not provided so we will use the aaf
238 /*LOGGER.info("Authorization the topic");
240 String permission = "";
241 String nameSpace = "";
242 if (topicBean.getTopicName().indexOf(".") > 1)
243 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
245 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246 "msgRtr.topicfactory.aaf");
248 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
250 permission = mrFactoryVal + nameSpace + "|create";
251 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/
253 //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
255 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
257 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
258 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
259 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
264 LOGGER.info(errRes.toString());
265 throw new DMaaPAccessDeniedException(errRes);
268 // if user is null and aaf authentication is ok then key should
272 * Added as part of AAF user it should return username
275 //key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
277 //LOGGER.info("key ==================== " + key);
283 final String topicName = topicBean.getTopicName();
284 final String desc = topicBean.getTopicDescription();
285 int partition = topicBean.getPartitionCount();
286 // int replica = topicBean.getReplicationCount();
287 if (partition == 0) {
290 final int partitions = partition;
292 int replica = topicBean.getReplicationCount();
297 final int replicas = replica;
298 boolean transactionEnabled = topicBean.isTransactionEnabled();
300 final Broker1 metabroker = getMetaBroker(dmaapContext);
301 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
303 LOGGER.info("Topic created successfully. Sending response");
304 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
305 } catch (JSONException excp) {
307 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
308 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
309 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
310 LOGGER.info(errRes.toString());
311 throw new CambriaApiException(errRes);
313 } catch (ConfigDbException excp1) {
315 LOGGER.error("Failed to create topic. Config DB Exception", excp1);
316 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
317 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
318 LOGGER.info(errRes.toString());
319 throw new CambriaApiException(errRes);
320 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
321 // TODO Auto-generated catch block
327 * @param dmaapContext
329 * @throws ConfigDbException
330 * @throws IOException
331 * @throws TopicExistsException
332 * @throws CambriaApiException
333 * @throws AccessDeniedException
336 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
337 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
339 LOGGER.info(" Deleting topic " + topicName);
341 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
342 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
343 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
344 + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
345 LOGGER.info(errRes.toString());
346 throw new DMaaPAccessDeniedException(errRes);
349 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
351 /*if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
352 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
353 // String permission =
354 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
355 String permission = "";
356 String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
357 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
358 "msgRtr.topicfactory.aaf");
359 // String tokens[] = topicName.split(".mr.topic.");
360 permission = mrFactoryVal + nameSpace + "|destroy";
361 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
362 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
363 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
364 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
365 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
366 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
367 + errorMessages.getNotPermitted2());
368 LOGGER.info(errRes.toString());
369 throw new DMaaPAccessDeniedException(errRes);
374 final Broker1 metabroker = getMetaBroker(dmaapContext);
375 final Topic topic = metabroker.getTopic(topicName);
378 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
379 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
383 metabroker.deleteTopic(topicName);
384 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
385 // TODO Auto-generated catch block
386 throw new CambriaApiException(500, "failed to delete the topic");
389 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
390 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
396 * @param dmaapContext
399 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
400 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
404 * @param dmaapContext
406 * @throws ConfigDbException
407 * @throws IOException
408 * @throws TopicExistsException
412 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
413 throws ConfigDbException, IOException, TopicExistsException {
414 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
415 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
418 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
419 throw new TopicExistsException(
420 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
423 final NsaAcl acl = topic.getWriterAcl();
425 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
426 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
435 private static JSONObject aclToJson(NsaAcl acl) {
436 final JSONObject o = new JSONObject();
438 o.put("enabled", false);
439 o.put("users", new JSONArray());
441 o.put("enabled", acl.isActive());
443 final JSONArray a = new JSONArray();
444 for (String user : acl.getUsers()) {
453 * @param dmaapContext
457 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
458 throws IOException, ConfigDbException, TopicExistsException {
459 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
460 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
463 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
464 throw new TopicExistsException(
465 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
468 final NsaAcl acl = topic.getReaderAcl();
470 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
471 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
480 private static JSONObject topicToJson(Topic t) {
481 final JSONObject o = new JSONObject();
483 o.put("name", t.getName());
484 o.put("description", t.getDescription());
485 o.put("owner", t.getOwner());
486 o.put("readerAcl", aclToJson(t.getReaderAcl()));
487 o.put("writerAcl", aclToJson(t.getWriterAcl()));
493 * @param dmaapContext
494 * @param topicName @param producerId @throws
495 * ConfigDbException @throws IOException @throws
496 * TopicExistsException @throws AccessDeniedException @throws
500 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
501 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
503 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
504 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
506 // if (user == null) {
508 // LOGGER.info("Authenticating the user, as ACL authentication is not
510 //// String permission =
511 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
513 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
514 // String permission = aaf.aafPermissionString(topicName, "manage");
515 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
517 // LOGGER.error("Failed to permit write access to producer [" +
518 // producerId + "] for topic " + topicName
519 // + ". Authentication failed.");
520 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
521 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
522 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
523 // "+errorMessages.getNotPermitted2()+ topicName);
524 // LOGGER.info(errRes);
525 // throw new DMaaPAccessDeniedException(errRes);
529 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
532 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
533 + "] does not exist.");
534 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
535 + "] for topic. Topic [" + topicName + "] does not exist.");
538 topic.permitWritesFromUser(producerId, user);
540 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
541 + "]. Sending response.");
542 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
547 * @param dmaapContext
550 * @throws ConfigDbException
551 * @throws IOException
552 * @throws TopicExistsException
553 * @throws AccessDeniedException
554 * @throws DMaaPAccessDeniedException
558 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
559 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
560 DMaaPAccessDeniedException {
562 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
563 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
564 // if (user == null) {
566 //// String permission =
567 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
568 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
569 // String permission = aaf.aafPermissionString(topicName, "manage");
570 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
572 // LOGGER.error("Failed to revoke write access to producer [" +
573 // producerId + "] for topic " + topicName
574 // + ". Authentication failed.");
575 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
576 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
577 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
578 // "+errorMessages.getNotPermitted2()+ topicName);
579 // LOGGER.info(errRes);
580 // throw new DMaaPAccessDeniedException(errRes);
585 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
588 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
589 + "] does not exist.");
590 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
591 + "] for topic. Topic [" + topicName + "] does not exist.");
594 topic.denyWritesFromUser(producerId, user);
596 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
597 + "]. Sending response.");
598 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
603 * @param dmaapContext
606 * @throws DMaaPAccessDeniedException
609 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
610 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
611 DMaaPAccessDeniedException {
613 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
614 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
615 // if (user == null) {
617 //// String permission =
618 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
619 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
620 // String permission = aaf.aafPermissionString(topicName, "manage");
621 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
623 // LOGGER.error("Failed to permit read access to consumer [" +
624 // consumerId + "] for topic " + topicName
625 // + ". Authentication failed.");
626 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
627 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
628 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
629 // "+errorMessages.getNotPermitted2()+ topicName);
630 // LOGGER.info(errRes);
631 // throw new DMaaPAccessDeniedException(errRes);
635 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
638 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
639 + "] does not exist.");
640 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
641 + "] for topic. Topic [" + topicName + "] does not exist.");
644 topic.permitReadsByUser(consumerId, user);
646 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
647 + "]. Sending response.");
648 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
649 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
653 * @param dmaapContext
656 * @throws DMaaPAccessDeniedException
659 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
660 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
661 DMaaPAccessDeniedException {
663 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
664 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
665 // if (user == null) {
666 //// String permission =
667 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
668 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
669 // String permission = aaf.aafPermissionString(topicName, "manage");
670 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
672 // LOGGER.error("Failed to revoke read access to consumer [" +
673 // consumerId + "] for topic " + topicName
674 // + ". Authentication failed.");
675 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
676 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
677 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
678 // "+errorMessages.getNotPermitted2()+ topicName);
679 // LOGGER.info(errRes);
680 // throw new DMaaPAccessDeniedException(errRes);
686 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
689 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
690 + "] does not exist.");
691 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
692 + "] for topic. Topic [" + topicName + "] does not exist.");
695 topic.denyReadsByUser(consumerId, user);
697 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
698 + "]. Sending response.");
699 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
700 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");