1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
25 package com.att.nsa.cambria.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
66 public class TopicServiceImpl implements TopicService {
68 //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
71 private DMaaPErrorMessages errorMessages;
75 //@Value("${msgRtr.topicfactory.aaf}")
76 //private String mrFactory;
79 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
80 this.errorMessages = errorMessages;
85 * @throws JSONException
86 * @throws ConfigDbException
91 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
93 LOGGER.info("Fetching list of all the topics.");
94 JSONObject json = new JSONObject();
96 JSONArray topicsList = new JSONArray();
98 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
99 topicsList.put(topic.getName());
102 json.put("topics", topicsList);
104 LOGGER.info("Returning list of all the topics.");
105 DMaaPResponseBuilder.respondOk(dmaapContext, json);
110 * @param dmaapContext
111 * @throws JSONException
112 * @throws ConfigDbException
113 * @throws IOException
116 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
118 LOGGER.info("Fetching list of all the topics.");
119 JSONObject json = new JSONObject();
121 JSONArray topicsList = new JSONArray();
123 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
124 JSONObject obj = new JSONObject();
125 obj.put("topicName", topic.getName());
126 //obj.put("description", topic.getDescription());
127 obj.put("owner", topic.getOwner());
128 obj.put("txenabled", topic.isTransactionEnabled());
132 json.put("topics", topicsList);
134 LOGGER.info("Returning list of all the topics.");
135 DMaaPResponseBuilder.respondOk(dmaapContext, json);
141 * @param dmaapContext
143 * @throws ConfigDbException
144 * @throws IOException
145 * @throws TopicExistsException
148 public void getTopic(DMaaPContext dmaapContext, String topicName)
149 throws ConfigDbException, IOException, TopicExistsException {
151 LOGGER.info("Fetching details of topic " + topicName);
152 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
155 LOGGER.error("Topic [" + topicName + "] does not exist.");
156 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
159 JSONObject o = new JSONObject();
160 o.put ( "name", t.getName () );
161 o.put ( "description", t.getDescription () );
163 if (null!=t.getOwners ())
164 o.put ( "owner", t.getOwners ().iterator ().next () );
165 if(null!=t.getReaderAcl ())
166 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
167 if(null!=t.getWriterAcl ())
168 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
170 LOGGER.info("Returning details of topic " + topicName);
171 DMaaPResponseBuilder.respondOk(dmaapContext, o);
177 * @param dmaapContext
179 * @throws CambriaApiException
180 * @throws AccessDeniedException
181 * @throws IOException
182 * @throws TopicExistsException
183 * @throws JSONException
189 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190 throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
192 LOGGER.info("Creating topic " + topicBean.getTopicName());
194 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
196 String appName=dmaapContext.getRequest().getHeader("AppName");
197 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
203 if( enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
205 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
207 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
208 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
209 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
210 LOGGER.info(errRes.toString());
211 throw new DMaaPAccessDeniedException(errRes);
216 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
217 else if (user == null && null==dmaapContext.getRequest().getHeader("Authorization") &&
218 (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
219 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
221 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
222 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
223 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
224 LOGGER.info(errRes.toString());
225 throw new DMaaPAccessDeniedException(errRes);
228 if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") ||
229 null != dmaapContext.getRequest().getHeader("cookie"))) {
230 //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
231 // ACL authentication is not provided so we will use the aaf authentication
232 LOGGER.info("Authorization the topic");
234 String permission = "";
236 if(topicBean.getTopicName().indexOf(".")>1)
237 nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
239 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
241 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
243 permission = mrFactoryVal+nameSpace+"|create";
244 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
246 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
249 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
251 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
252 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
253 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
254 LOGGER.info(errRes.toString());
255 throw new DMaaPAccessDeniedException(errRes);
258 // if user is null and aaf authentication is ok then key should be ""
261 * Added as part of AAF user it should return username
264 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
265 LOGGER.info("key ==================== "+key);
271 final String topicName = topicBean.getTopicName();
272 final String desc = topicBean.getTopicDescription();
274 final int partitions = topicBean.getPartitionCount();
276 final int replicas = topicBean.getReplicationCount();
277 boolean transactionEnabled = topicBean.isTransactionEnabled();
280 final Broker metabroker = getMetaBroker(dmaapContext);
281 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
284 LOGGER.info("Topic created successfully. Sending response");
285 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
286 } catch (JSONException excp) {
288 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
289 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
290 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
291 errorMessages.getIncorrectJson());
292 LOGGER.info(errRes.toString());
293 throw new CambriaApiException(errRes);
299 * @param dmaapContext
301 * @throws ConfigDbException
302 * @throws IOException
303 * @throws TopicExistsException
304 * @throws CambriaApiException
305 * @throws AccessDeniedException
308 public void deleteTopic(DMaaPContext dmaapContext, String topicName)
309 throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
311 LOGGER.info("Deleting topic " + topicName);
312 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
314 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
315 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
316 // String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
317 String permission = "";
318 String nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
319 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
320 // String tokens[] = topicName.split(".mr.topic.");
321 permission = mrFactoryVal+nameSpace+"|destroy";
322 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
323 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
325 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
326 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
327 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
328 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
329 LOGGER.info(errRes.toString());
330 throw new DMaaPAccessDeniedException(errRes);
336 final Broker metabroker = getMetaBroker(dmaapContext);
337 final Topic topic = metabroker.getTopic(topicName);
340 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
341 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
344 metabroker.deleteTopic(topicName);
346 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
347 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
353 * @param dmaapContext
356 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
357 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
361 * @param dmaapContext
363 * @throws ConfigDbException
364 * @throws IOException
365 * @throws TopicExistsException
369 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
370 throws ConfigDbException, IOException, TopicExistsException {
371 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
372 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
375 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
376 throw new TopicExistsException(
377 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
382 final NsaAcl acl = topic.getWriterAcl();
384 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
385 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
394 private static JSONObject aclToJson(NsaAcl acl) {
395 final JSONObject o = new JSONObject();
397 o.put("enabled", false);
398 o.put("users", new JSONArray());
400 o.put("enabled", acl.isActive());
402 final JSONArray a = new JSONArray();
403 for (String user : acl.getUsers()) {
412 * @param dmaapContext
416 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
417 throws IOException, ConfigDbException, TopicExistsException {
418 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
419 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
422 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
423 throw new TopicExistsException(
424 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
427 final NsaAcl acl = topic.getReaderAcl();
429 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
430 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
439 private static JSONObject topicToJson(Topic t) {
440 final JSONObject o = new JSONObject();
442 o.put("name", t.getName());
443 o.put("description", t.getDescription());
444 o.put("owner", t.getOwner());
445 o.put("readerAcl", aclToJson(t.getReaderAcl()));
446 o.put("writerAcl", aclToJson(t.getWriterAcl()));
452 * @param dmaapContext
455 * @throws ConfigDbException
456 * @throws IOException
457 * @throws TopicExistsException
458 * @throws AccessDeniedException
463 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
464 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
466 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
467 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
469 // if (user == null) {
471 // LOGGER.info("Authenticating the user, as ACL authentication is not provided");
472 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
474 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
475 // String permission = aaf.aafPermissionString(topicName, "manage");
476 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
478 // LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
479 // + ". Authentication failed.");
480 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
481 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
482 // errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
483 // LOGGER.info(errRes);
484 // throw new DMaaPAccessDeniedException(errRes);
488 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
491 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
492 + "] does not exist.");
493 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
494 + "] for topic. Topic [" + topicName + "] does not exist.");
497 topic.permitWritesFromUser(producerId, user);
499 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
500 + "]. Sending response.");
501 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
506 * @param dmaapContext
509 * @throws ConfigDbException
510 * @throws IOException
511 * @throws TopicExistsException
512 * @throws AccessDeniedException
513 * @throws DMaaPAccessDeniedException
517 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
518 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
520 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
521 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
522 // if (user == null) {
524 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
525 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
526 // String permission = aaf.aafPermissionString(topicName, "manage");
527 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
529 // LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
530 // + ". Authentication failed.");
531 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
532 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
533 // errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
534 // LOGGER.info(errRes);
535 // throw new DMaaPAccessDeniedException(errRes);
540 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
543 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
544 + "] does not exist.");
545 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
546 + "] for topic. Topic [" + topicName + "] does not exist.");
549 topic.denyWritesFromUser(producerId, user);
551 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
552 + "]. Sending response.");
553 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
558 * @param dmaapContext
561 * @throws DMaaPAccessDeniedException
564 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
565 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
567 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
568 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
569 // if (user == null) {
571 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
572 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
573 // String permission = aaf.aafPermissionString(topicName, "manage");
574 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
576 // LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
577 // + ". Authentication failed.");
578 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
579 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
580 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
581 // LOGGER.info(errRes);
582 // throw new DMaaPAccessDeniedException(errRes);
586 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
589 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
590 + "] does not exist.");
591 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
592 + "] for topic. Topic [" + topicName + "] does not exist.");
595 topic.permitReadsByUser(consumerId, user);
597 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
598 + "]. Sending response.");
599 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
600 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
604 * @param dmaapContext
607 * @throws DMaaPAccessDeniedException
610 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
611 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
613 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
614 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
615 // if (user == null) {
616 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
617 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
618 // String permission = aaf.aafPermissionString(topicName, "manage");
619 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
621 // LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
622 // + ". Authentication failed.");
623 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
624 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
625 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
626 // LOGGER.info(errRes);
627 // throw new DMaaPAccessDeniedException(errRes);
633 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
636 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
637 + "] does not exist.");
638 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
639 + "] for topic. Topic [" + topicName + "] does not exist.");
642 topic.denyReadsByUser(consumerId, user);
644 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
645 + "]. Sending response.");
646 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
647 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");