4 /*******************************************************************************
5 * ============LICENSE_START=======================================================
7 * ================================================================================
8 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
25 package org.onap.dmaap.dmf.mr.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import org.onap.dmaap.dmf.mr.CambriaApiException;
38 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
39 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import org.onap.dmaap.dmf.mr.beans.TopicBean;
41 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
42 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
43 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
45 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
46 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
47 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
49 import org.onap.dmaap.dmf.mr.metabroker.Topic;
50 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.service.TopicService;
54 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.eelf.configuration.EELFLogger;
57 import com.att.eelf.configuration.EELFManager;
58 import com.att.nsa.configs.ConfigDbException;
59 import com.att.nsa.security.NsaAcl;
60 import com.att.nsa.security.NsaApiKey;
61 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
64 * @author muzainulhaque.qazi
68 public class TopicServiceImpl implements TopicService {
70 // private static final Logger LOGGER =
72 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
74 private DMaaPErrorMessages errorMessages;
76 // @Value("${msgRtr.topicfactory.aaf}")
79 public DMaaPErrorMessages getErrorMessages() {
83 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
84 this.errorMessages = errorMessages;
89 * @throws JSONException
90 * @throws ConfigDbException
95 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
96 LOGGER.info("Fetching list of all the topics.");
97 JSONObject json = new JSONObject();
99 JSONArray topicsList = new JSONArray();
101 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
102 topicsList.put(topic.getName());
105 json.put("topics", topicsList);
107 LOGGER.info("Returning list of all the topics.");
108 DMaaPResponseBuilder.respondOk(dmaapContext, json);
113 * @param dmaapContext
114 * @throws JSONException
115 * @throws ConfigDbException
116 * @throws IOException
119 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
121 LOGGER.info("Fetching list of all the topics.");
122 JSONObject json = new JSONObject();
124 JSONArray topicsList = new JSONArray();
126 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
127 JSONObject obj = new JSONObject();
128 obj.put("topicName", topic.getName());
130 obj.put("owner", topic.getOwner());
131 obj.put("txenabled", topic.isTransactionEnabled());
135 json.put("topics", topicsList);
137 LOGGER.info("Returning list of all the topics.");
138 DMaaPResponseBuilder.respondOk(dmaapContext, json);
143 * @param dmaapContext
145 * @throws ConfigDbException
146 * @throws IOException
147 * @throws TopicExistsException
150 public void getTopic(DMaaPContext dmaapContext, String topicName)
151 throws ConfigDbException, IOException, TopicExistsException {
153 LOGGER.info("Fetching details of topic " + topicName);
154 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
157 LOGGER.error("Topic [" + topicName + "] does not exist.");
158 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
161 JSONObject o = new JSONObject();
162 o.put("name", t.getName());
163 o.put("description", t.getDescription());
165 if (null != t.getOwners())
166 o.put("owner", t.getOwners().iterator().next());
167 if (null != t.getReaderAcl())
168 o.put("readerAcl", aclToJson(t.getReaderAcl()));
169 if (null != t.getWriterAcl())
170 o.put("writerAcl", aclToJson(t.getWriterAcl()));
172 LOGGER.info("Returning details of topic " + topicName);
173 DMaaPResponseBuilder.respondOk(dmaapContext, o);
178 * @param dmaapContext
180 * @throws CambriaApiException
181 * @throws AccessDeniedException
182 * @throws IOException
183 * @throws TopicExistsException
184 * @throws JSONException
190 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
191 throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
192 LOGGER.info("Creating topic " + topicBean.getTopicName());
194 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
196 String appName = dmaapContext.getRequest().getHeader("AppName");
197 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
198 "enforced.topic.name.AAF");
203 if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
205 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
207 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
208 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
209 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
211 LOGGER.info(errRes.toString());
212 // throw new DMaaPAccessDeniedException(errRes);
216 // else if (user==null &&
217 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
218 // == dmaapContext.getRequest().getHeader("cookie")) ) {
219 else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
220 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
221 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
223 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
224 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
225 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
227 LOGGER.info(errRes.toString());
228 // throw new DMaaPAccessDeniedException(errRes);
231 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
233 // if (user == null &&
234 // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
235 // null != dmaapContext.getRequest().getHeader("cookie"))) {
236 // ACL authentication is not provided so we will use the aaf
238 LOGGER.info("Authorization the topic");
240 String permission = "";
241 String nameSpace = "";
242 if (topicBean.getTopicName().indexOf(".") > 1)
243 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
245 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246 "msgRtr.topicfactory.aaf");
248 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
250 permission = mrFactoryVal + nameSpace + "|create";
251 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
253 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
255 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
257 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
258 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
259 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
262 LOGGER.info(errRes.toString());
263 throw new DMaaPAccessDeniedException(errRes);
266 // if user is null and aaf authentication is ok then key should
270 * Added as part of AAF user it should return username
273 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
274 LOGGER.info("key ==================== " + key);
280 final String topicName = topicBean.getTopicName();
281 final String desc = topicBean.getTopicDescription();
282 int partition = topicBean.getPartitionCount();
283 // int replica = topicBean.getReplicationCount();
284 if (partition == 0) {
287 final int partitions = partition;
289 int replica = topicBean.getReplicationCount();
293 final int replicas = replica;
294 boolean transactionEnabled = topicBean.isTransactionEnabled();
296 final Broker1 metabroker = getMetaBroker(dmaapContext);
297 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
299 LOGGER.info("Topic created successfully. Sending response");
300 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
301 } catch (JSONException excp) {
303 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
304 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
305 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
306 LOGGER.info(errRes.toString());
307 throw new CambriaApiException(errRes);
309 } catch (ConfigDbException excp1) {
311 LOGGER.error("Failed to create topic. Config DB Exception", excp1);
312 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
313 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
314 LOGGER.info(errRes.toString());
315 throw new CambriaApiException(errRes);
316 } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) {
317 // TODO Auto-generated catch block
318 LOGGER.error( e.getMessage());
323 * @param dmaapContext
325 * @throws ConfigDbException
326 * @throws IOException
327 * @throws TopicExistsException
328 * @throws CambriaApiException
329 * @throws AccessDeniedException
332 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
333 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
336 LOGGER.info(" Deleting topic " + topicName);
338 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
339 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
340 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
341 + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
342 LOGGER.info(errRes.toString());
343 throw new DMaaPAccessDeniedException(errRes);
346 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
348 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
349 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
350 // String permission =
352 String permission = "";
353 String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
354 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
355 "msgRtr.topicfactory.aaf");
357 permission = mrFactoryVal + nameSpace + "|destroy";
358 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
359 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
360 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
361 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
362 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
363 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
364 + errorMessages.getNotPermitted2());
365 LOGGER.info(errRes.toString());
366 throw new DMaaPAccessDeniedException(errRes);
371 final Broker1 metabroker = getMetaBroker(dmaapContext);
372 final Topic topic = metabroker.getTopic(topicName);
375 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
376 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
379 // metabroker.deleteTopic(topicName);
381 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
382 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
387 * @param dmaapContext
390 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
391 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
395 * @param dmaapContext
397 * @throws ConfigDbException
398 * @throws IOException
399 * @throws TopicExistsException
403 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
404 throws ConfigDbException, IOException, TopicExistsException {
405 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
406 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
409 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
410 throw new TopicExistsException(
411 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
414 final NsaAcl acl = topic.getWriterAcl();
416 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
417 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
426 private static JSONObject aclToJson(NsaAcl acl) {
427 final JSONObject o = new JSONObject();
429 o.put("enabled", false);
430 o.put("users", new JSONArray());
432 o.put("enabled", acl.isActive());
434 final JSONArray a = new JSONArray();
435 for (String user : acl.getUsers()) {
444 * @param dmaapContext
448 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
449 throws IOException, ConfigDbException, TopicExistsException {
450 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
451 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
454 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
455 throw new TopicExistsException(
456 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
459 final NsaAcl acl = topic.getReaderAcl();
461 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
462 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
471 private static JSONObject topicToJson(Topic t) {
472 final JSONObject o = new JSONObject();
474 o.put("name", t.getName());
475 o.put("description", t.getDescription());
476 o.put("owner", t.getOwner());
477 o.put("readerAcl", aclToJson(t.getReaderAcl()));
478 o.put("writerAcl", aclToJson(t.getWriterAcl()));
484 * @param dmaapContext
485 * @param topicName @param producerId @throws
486 * ConfigDbException @throws IOException @throws
487 * TopicExistsException @throws AccessDeniedException @throws
491 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
492 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
494 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
495 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
499 // LOGGER.info("Authenticating the user, as ACL authentication is not
501 //// String permission =
508 // LOGGER.error("Failed to permit write access to producer [" +
509 // producerId + "] for topic " + topicName
511 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
512 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
513 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
520 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
523 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
524 + "] does not exist.");
525 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
526 + "] for topic. Topic [" + topicName + "] does not exist.");
529 topic.permitWritesFromUser(producerId, user);
531 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
532 + "]. Sending response.");
533 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
538 * @param dmaapContext
541 * @throws ConfigDbException
542 * @throws IOException
543 * @throws TopicExistsException
544 * @throws AccessDeniedException
545 * @throws DMaaPAccessDeniedException
549 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
550 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
551 DMaaPAccessDeniedException {
553 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
554 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
557 //// String permission =
559 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
560 // String permission = aaf.aafPermissionString(topicName, "manage");
561 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
563 // LOGGER.error("Failed to revoke write access to producer [" +
564 // producerId + "] for topic " + topicName
566 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
567 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
568 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
571 // throw new DMaaPAccessDeniedException(errRes);
576 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
579 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
580 + "] does not exist.");
581 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
582 + "] for topic. Topic [" + topicName + "] does not exist.");
585 topic.denyWritesFromUser(producerId, user);
587 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
588 + "]. Sending response.");
589 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
594 * @param dmaapContext
597 * @throws DMaaPAccessDeniedException
600 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
601 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
602 DMaaPAccessDeniedException {
604 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
605 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
608 //// String permission =
611 // String permission = aaf.aafPermissionString(topicName, "manage");
612 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
614 // LOGGER.error("Failed to permit read access to consumer [" +
615 // consumerId + "] for topic " + topicName
617 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
618 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
619 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
626 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
629 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
630 + "] does not exist.");
631 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
632 + "] for topic. Topic [" + topicName + "] does not exist.");
635 topic.permitReadsByUser(consumerId, user);
637 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
638 + "]. Sending response.");
639 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
640 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
644 * @param dmaapContext
647 * @throws DMaaPAccessDeniedException
650 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
651 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
652 DMaaPAccessDeniedException {
654 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
655 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
657 //// String permission =
660 // String permission = aaf.aafPermissionString(topicName, "manage");
661 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
663 // LOGGER.error("Failed to revoke read access to consumer [" +
664 // consumerId + "] for topic " + topicName
666 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
667 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
668 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
671 // throw new DMaaPAccessDeniedException(errRes);
676 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
679 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
680 + "] does not exist.");
681 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
682 + "] for topic. Topic [" + topicName + "] does not exist.");
685 topic.denyReadsByUser(consumerId, user);
687 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
688 + "]. Sending response.");
689 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
690 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");