4 /*******************************************************************************
5 * ============LICENSE_START=======================================================
7 * ================================================================================
8 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
25 package org.onap.dmaap.dmf.mr.service.impl;
27 import java.io.IOException;
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.http.HttpStatus;
31 import org.json.JSONArray;
32 import org.json.JSONException;
33 import org.json.JSONObject;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Service;
37 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
38 import org.onap.dmaap.dmf.mr.CambriaApiException;
39 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
40 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
41 import org.onap.dmaap.dmf.mr.beans.TopicBean;
42 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
43 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
46 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
47 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
50 import org.onap.dmaap.dmf.mr.metabroker.Topic;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
54 import org.onap.dmaap.dmf.mr.service.TopicService;
55 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
56 import org.onap.dmaap.dmf.mr.utils.Utils;
57 import com.att.eelf.configuration.EELFLogger;
58 import com.att.eelf.configuration.EELFManager;
59 import com.att.nsa.configs.ConfigDbException;
60 import com.att.nsa.security.NsaAcl;
61 import com.att.nsa.security.NsaApiKey;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
65 * @author muzainulhaque.qazi
69 public class TopicServiceImpl implements TopicService {
71 // private static final Logger LOGGER =
73 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
75 private DMaaPErrorMessages errorMessages;
77 // @Value("${msgRtr.topicfactory.aaf}")
80 public DMaaPErrorMessages getErrorMessages() {
84 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
85 this.errorMessages = errorMessages;
90 * @throws JSONException
91 * @throws ConfigDbException
96 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
97 LOGGER.info("Fetching list of all the topics.");
98 JSONObject json = new JSONObject();
100 JSONArray topicsList = new JSONArray();
102 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
103 topicsList.put(topic.getName());
106 json.put("topics", topicsList);
108 LOGGER.info("Returning list of all the topics.");
109 DMaaPResponseBuilder.respondOk(dmaapContext, json);
114 * @param dmaapContext
115 * @throws JSONException
116 * @throws ConfigDbException
117 * @throws IOException
120 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
122 LOGGER.info("Fetching list of all the topics.");
123 JSONObject json = new JSONObject();
125 JSONArray topicsList = new JSONArray();
127 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
128 JSONObject obj = new JSONObject();
129 obj.put("topicName", topic.getName());
131 obj.put("owner", topic.getOwner());
132 obj.put("txenabled", topic.isTransactionEnabled());
136 json.put("topics", topicsList);
138 LOGGER.info("Returning list of all the topics.");
139 DMaaPResponseBuilder.respondOk(dmaapContext, json);
144 * @param dmaapContext
146 * @throws ConfigDbException
147 * @throws IOException
148 * @throws TopicExistsException
151 public void getTopic(DMaaPContext dmaapContext, String topicName)
152 throws ConfigDbException, IOException, TopicExistsException {
154 LOGGER.info("Fetching details of topic " + topicName);
155 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
158 LOGGER.error("Topic [" + topicName + "] does not exist.");
159 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
162 JSONObject o = new JSONObject();
163 o.put("name", t.getName());
164 o.put("description", t.getDescription());
166 if (null != t.getOwners())
167 o.put("owner", t.getOwners().iterator().next());
168 if (null != t.getReaderAcl())
169 o.put("readerAcl", aclToJson(t.getReaderAcl()));
170 if (null != t.getWriterAcl())
171 o.put("writerAcl", aclToJson(t.getWriterAcl()));
173 LOGGER.info("Returning details of topic " + topicName);
174 DMaaPResponseBuilder.respondOk(dmaapContext, o);
179 * @param dmaapContext
181 * @throws CambriaApiException
182 * @throws AccessDeniedException
183 * @throws IOException
184 * @throws TopicExistsException
185 * @throws JSONException
191 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
192 throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
193 LOGGER.info("Creating topic " + topicBean.getTopicName());
195 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
197 String appName = dmaapContext.getRequest().getHeader("AppName");
198 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
199 "enforced.topic.name.AAF");
204 if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
206 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
208 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
209 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
210 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
212 LOGGER.info(errRes.toString());
213 // throw new DMaaPAccessDeniedException(errRes);
217 // else if (user==null &&
218 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
219 // == dmaapContext.getRequest().getHeader("cookie")) ) {
220 else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
221 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
222 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
224 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
225 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
226 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
228 LOGGER.info(errRes.toString());
229 // throw new DMaaPAccessDeniedException(errRes);
232 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
234 // if (user == null &&
235 // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
236 // null != dmaapContext.getRequest().getHeader("cookie"))) {
237 // ACL authentication is not provided so we will use the aaf
239 LOGGER.info("Authorization the topic");
241 String permission = "";
242 String nameSpace = "";
243 if (topicBean.getTopicName().indexOf(".") > 1)
244 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
246 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
247 "msgRtr.topicfactory.aaf");
249 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
251 permission = mrFactoryVal + nameSpace + "|create";
252 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
254 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
256 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
258 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
259 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
260 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
263 LOGGER.info(errRes.toString());
264 throw new DMaaPAccessDeniedException(errRes);
267 // if user is null and aaf authentication is ok then key should
271 * Added as part of AAF user it should return username
274 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
275 LOGGER.info("key ==================== " + key);
281 final String topicName = topicBean.getTopicName();
282 final String desc = topicBean.getTopicDescription();
283 int partition = topicBean.getPartitionCount();
284 // int replica = topicBean.getReplicationCount();
285 String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
286 "default.partitions");
287 String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
289 if (partition == 0) {
290 if(StringUtils.isNotEmpty(defaultPartitions)){
291 partition=Integer.parseInt(defaultPartitions);
297 final int partitions = partition;
299 int replica = topicBean.getReplicationCount();
301 if(StringUtils.isNotEmpty(defaultReplicas)){
302 replica=Integer.parseInt(defaultReplicas);
308 final int replicas = replica;
309 boolean transactionEnabled = topicBean.isTransactionEnabled();
311 final Broker1 metabroker = getMetaBroker(dmaapContext);
312 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
314 LOGGER.info("Topic created successfully. Sending response");
315 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
316 } catch (JSONException excp) {
318 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
319 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
320 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
321 LOGGER.info(errRes.toString());
322 throw new CambriaApiException(errRes);
324 } catch (ConfigDbException excp1) {
326 LOGGER.error("Failed to create topic. Config DB Exception", excp1);
327 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
328 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
329 LOGGER.info(errRes.toString());
330 throw new CambriaApiException(errRes);
331 } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) {
332 // TODO Auto-generated catch block
333 LOGGER.error( e.getMessage());
338 * @param dmaapContext
340 * @throws ConfigDbException
341 * @throws IOException
342 * @throws TopicExistsException
343 * @throws CambriaApiException
344 * @throws AccessDeniedException
347 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
348 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
351 LOGGER.info(" Deleting topic " + topicName);
353 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
354 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
355 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
356 + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
357 LOGGER.info(errRes.toString());
358 throw new DMaaPAccessDeniedException(errRes);
361 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
363 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
364 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
365 // String permission =
367 String permission = "";
368 String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
369 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
370 "msgRtr.topicfactory.aaf");
372 permission = mrFactoryVal + nameSpace + "|destroy";
373 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
374 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
375 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
376 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
377 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
378 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
379 + errorMessages.getNotPermitted2());
380 LOGGER.info(errRes.toString());
381 throw new DMaaPAccessDeniedException(errRes);
386 final Broker1 metabroker = getMetaBroker(dmaapContext);
387 final Topic topic = metabroker.getTopic(topicName);
390 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
391 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
394 // metabroker.deleteTopic(topicName);
396 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
397 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
402 * @param dmaapContext
405 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
406 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
410 * @param dmaapContext
412 * @throws ConfigDbException
413 * @throws IOException
414 * @throws TopicExistsException
418 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
419 throws ConfigDbException, IOException, TopicExistsException {
420 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
421 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
424 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
425 throw new TopicExistsException(
426 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
429 final NsaAcl acl = topic.getWriterAcl();
431 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
432 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
441 private static JSONObject aclToJson(NsaAcl acl) {
442 final JSONObject o = new JSONObject();
444 o.put("enabled", false);
445 o.put("users", new JSONArray());
447 o.put("enabled", acl.isActive());
449 final JSONArray a = new JSONArray();
450 for (String user : acl.getUsers()) {
459 * @param dmaapContext
463 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
464 throws IOException, ConfigDbException, TopicExistsException {
465 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
466 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
469 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
470 throw new TopicExistsException(
471 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
474 final NsaAcl acl = topic.getReaderAcl();
476 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
477 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
486 private static JSONObject topicToJson(Topic t) {
487 final JSONObject o = new JSONObject();
489 o.put("name", t.getName());
490 o.put("description", t.getDescription());
491 o.put("owner", t.getOwner());
492 o.put("readerAcl", aclToJson(t.getReaderAcl()));
493 o.put("writerAcl", aclToJson(t.getWriterAcl()));
499 * @param dmaapContext
500 * @param topicName @param producerId @throws
501 * ConfigDbException @throws IOException @throws
502 * TopicExistsException @throws AccessDeniedException @throws
506 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
507 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
509 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
510 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
514 // LOGGER.info("Authenticating the user, as ACL authentication is not
516 //// String permission =
523 // LOGGER.error("Failed to permit write access to producer [" +
524 // producerId + "] for topic " + topicName
526 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
527 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
528 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
535 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
538 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
539 + "] does not exist.");
540 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
541 + "] for topic. Topic [" + topicName + "] does not exist.");
544 topic.permitWritesFromUser(producerId, user);
546 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
547 + "]. Sending response.");
548 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
553 * @param dmaapContext
556 * @throws ConfigDbException
557 * @throws IOException
558 * @throws TopicExistsException
559 * @throws AccessDeniedException
560 * @throws DMaaPAccessDeniedException
564 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
565 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
566 DMaaPAccessDeniedException {
568 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
569 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
572 //// String permission =
574 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
575 // String permission = aaf.aafPermissionString(topicName, "manage");
576 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
578 // LOGGER.error("Failed to revoke write access to producer [" +
579 // producerId + "] for topic " + topicName
581 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
582 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
583 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
586 // throw new DMaaPAccessDeniedException(errRes);
591 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
594 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
595 + "] does not exist.");
596 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
597 + "] for topic. Topic [" + topicName + "] does not exist.");
600 topic.denyWritesFromUser(producerId, user);
602 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
603 + "]. Sending response.");
604 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
609 * @param dmaapContext
612 * @throws DMaaPAccessDeniedException
615 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
616 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
617 DMaaPAccessDeniedException {
619 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
620 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
623 //// String permission =
626 // String permission = aaf.aafPermissionString(topicName, "manage");
627 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
629 // LOGGER.error("Failed to permit read access to consumer [" +
630 // consumerId + "] for topic " + topicName
632 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
633 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
634 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
641 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
644 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
645 + "] does not exist.");
646 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
647 + "] for topic. Topic [" + topicName + "] does not exist.");
650 topic.permitReadsByUser(consumerId, user);
652 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
653 + "]. Sending response.");
654 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
655 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
659 * @param dmaapContext
662 * @throws DMaaPAccessDeniedException
665 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
666 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
667 DMaaPAccessDeniedException {
669 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
670 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
672 //// String permission =
675 // String permission = aaf.aafPermissionString(topicName, "manage");
676 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
678 // LOGGER.error("Failed to revoke read access to consumer [" +
679 // consumerId + "] for topic " + topicName
681 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
682 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
683 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
686 // throw new DMaaPAccessDeniedException(errRes);
691 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
694 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
695 + "] does not exist.");
696 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
697 + "] for topic. Topic [" + topicName + "] does not exist.");
700 topic.denyReadsByUser(consumerId, user);
702 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
703 + "]. Sending response.");
704 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
705 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");