1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
25 package com.att.nsa.cambria.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
66 public class TopicServiceImpl implements TopicService {
68 //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
71 private DMaaPErrorMessages errorMessages;
75 //@Value("${msgRtr.topicfactory.aaf}")
76 //private String mrFactory;
79 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
80 this.errorMessages = errorMessages;
85 * @throws JSONException
86 * @throws ConfigDbException
91 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
93 LOGGER.info("Fetching list of all the topics.");
94 JSONObject json = new JSONObject();
96 JSONArray topicsList = new JSONArray();
98 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
99 topicsList.put(topic.getName());
102 json.put("topics", topicsList);
104 LOGGER.info("Returning list of all the topics.");
105 DMaaPResponseBuilder.respondOk(dmaapContext, json);
110 * @param dmaapContext
111 * @throws JSONException
112 * @throws ConfigDbException
113 * @throws IOException
116 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
118 LOGGER.info("Fetching list of all the topics.");
119 JSONObject json = new JSONObject();
121 JSONArray topicsList = new JSONArray();
123 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
124 JSONObject obj = new JSONObject();
125 obj.put("topicName", topic.getName());
126 //obj.put("description", topic.getDescription());
127 obj.put("owner", topic.getOwner());
128 obj.put("txenabled", topic.isTransactionEnabled());
132 json.put("topics", topicsList);
134 LOGGER.info("Returning list of all the topics.");
135 DMaaPResponseBuilder.respondOk(dmaapContext, json);
141 * @param dmaapContext
143 * @throws ConfigDbException
144 * @throws IOException
145 * @throws TopicExistsException
148 public void getTopic(DMaaPContext dmaapContext, String topicName)
149 throws ConfigDbException, IOException, TopicExistsException {
151 LOGGER.info("Fetching details of topic " + topicName);
152 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
155 LOGGER.error("Topic [" + topicName + "] does not exist.");
156 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
159 JSONObject o = new JSONObject();
160 o.put ( "name", t.getName () );
161 o.put ( "description", t.getDescription () );
163 if (null!=t.getOwners ())
164 o.put ( "owner", t.getOwners ().iterator ().next () );
165 if(null!=t.getReaderAcl ())
166 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
167 if(null!=t.getWriterAcl ())
168 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
170 LOGGER.info("Returning details of topic " + topicName);
171 DMaaPResponseBuilder.respondOk(dmaapContext, o);
177 * @param dmaapContext
179 * @throws CambriaApiException
180 * @throws AccessDeniedException
181 * @throws IOException
182 * @throws TopicExistsException
183 * @throws JSONException
189 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190 throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
192 LOGGER.info("Creating topic " + topicBean.getTopicName());
194 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
196 String appName=dmaapContext.getRequest().getHeader("AppName");
197 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
202 if( enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
204 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
206 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
207 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
208 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
209 LOGGER.info(errRes.toString());
210 throw new DMaaPAccessDeniedException(errRes);
215 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
216 /*else if (user == null && null==dmaapContext.getRequest().getHeader("Authorization") &&
217 (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
218 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
220 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
221 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
222 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
223 LOGGER.info(errRes.toString());
224 throw new DMaaPAccessDeniedException(errRes);
227 if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization"))) {
228 //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
229 // ACL authentication is not provided so we will use the aaf authentication
230 LOGGER.info("Authorization the topic");
232 String permission = "";
234 if(topicBean.getTopicName().indexOf(".")>1)
235 nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
237 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
239 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
241 permission = mrFactoryVal+nameSpace+"|create";
242 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
244 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
247 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
249 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
250 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
251 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
252 LOGGER.info(errRes.toString());
253 throw new DMaaPAccessDeniedException(errRes);
256 // if user is null and aaf authentication is ok then key should be ""
259 * Added as part of AAF user it should return username
262 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
263 LOGGER.info("key ==================== "+key);
269 final String topicName = topicBean.getTopicName();
270 final String desc = topicBean.getTopicDescription();
272 final int partitions = topicBean.getPartitionCount();
274 final int replicas = topicBean.getReplicationCount();
275 boolean transactionEnabled = topicBean.isTransactionEnabled();
278 final Broker metabroker = getMetaBroker(dmaapContext);
279 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
282 LOGGER.info("Topic created successfully. Sending response");
283 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
284 } catch (JSONException excp) {
286 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
287 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
288 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
289 errorMessages.getIncorrectJson());
290 LOGGER.info(errRes.toString());
291 throw new CambriaApiException(errRes);
297 * @param dmaapContext
299 * @throws ConfigDbException
300 * @throws IOException
301 * @throws TopicExistsException
302 * @throws CambriaApiException
303 * @throws AccessDeniedException
306 public void deleteTopic(DMaaPContext dmaapContext, String topicName)
307 throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
309 LOGGER.info("Deleting topic " + topicName);
310 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
312 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
313 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
314 // String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
315 String permission = "";
317 nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
318 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
319 // String tokens[] = topicName.split(".mr.topic.");
320 permission = mrFactoryVal+nameSpace+"|destroy";
321 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
322 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
324 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
325 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
326 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
327 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
328 LOGGER.info(errRes.toString());
329 throw new DMaaPAccessDeniedException(errRes);
335 final Broker metabroker = getMetaBroker(dmaapContext);
336 final Topic topic = metabroker.getTopic(topicName);
339 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
340 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
343 metabroker.deleteTopic(topicName);
345 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
346 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
352 * @param dmaapContext
355 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
356 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
360 * @param dmaapContext
362 * @throws ConfigDbException
363 * @throws IOException
364 * @throws TopicExistsException
368 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
369 throws ConfigDbException, IOException, TopicExistsException {
370 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
371 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
374 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
375 throw new TopicExistsException(
376 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
381 final NsaAcl acl = topic.getWriterAcl();
383 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
384 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
393 private static JSONObject aclToJson(NsaAcl acl) {
394 final JSONObject o = new JSONObject();
396 o.put("enabled", false);
397 o.put("users", new JSONArray());
399 o.put("enabled", acl.isActive());
401 final JSONArray a = new JSONArray();
402 for (String user : acl.getUsers()) {
411 * @param dmaapContext
415 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
416 throws IOException, ConfigDbException, TopicExistsException {
417 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
418 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
421 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
422 throw new TopicExistsException(
423 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
426 final NsaAcl acl = topic.getReaderAcl();
428 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
429 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
438 private static JSONObject topicToJson(Topic t) {
439 final JSONObject o = new JSONObject();
441 o.put("name", t.getName());
442 o.put("description", t.getDescription());
443 o.put("owner", t.getOwner());
444 o.put("readerAcl", aclToJson(t.getReaderAcl()));
445 o.put("writerAcl", aclToJson(t.getWriterAcl()));
451 * @param dmaapContext
454 * @throws ConfigDbException
455 * @throws IOException
456 * @throws TopicExistsException
457 * @throws AccessDeniedException
462 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
463 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
465 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
466 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
468 // if (user == null) {
470 // LOGGER.info("Authenticating the user, as ACL authentication is not provided");
471 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
473 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
474 // String permission = aaf.aafPermissionString(topicName, "manage");
475 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
477 // LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
478 // + ". Authentication failed.");
479 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
480 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
481 // errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
482 // LOGGER.info(errRes);
483 // throw new DMaaPAccessDeniedException(errRes);
487 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
490 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
491 + "] does not exist.");
492 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
493 + "] for topic. Topic [" + topicName + "] does not exist.");
496 topic.permitWritesFromUser(producerId, user);
498 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
499 + "]. Sending response.");
500 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
505 * @param dmaapContext
508 * @throws ConfigDbException
509 * @throws IOException
510 * @throws TopicExistsException
511 * @throws AccessDeniedException
512 * @throws DMaaPAccessDeniedException
516 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
517 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
519 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
520 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
521 // if (user == null) {
523 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
524 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
525 // String permission = aaf.aafPermissionString(topicName, "manage");
526 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
528 // LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
529 // + ". Authentication failed.");
530 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
531 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
532 // errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
533 // LOGGER.info(errRes);
534 // throw new DMaaPAccessDeniedException(errRes);
539 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
542 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
543 + "] does not exist.");
544 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
545 + "] for topic. Topic [" + topicName + "] does not exist.");
548 topic.denyWritesFromUser(producerId, user);
550 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
551 + "]. Sending response.");
552 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
557 * @param dmaapContext
560 * @throws DMaaPAccessDeniedException
563 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
564 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
566 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
567 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
568 // if (user == null) {
570 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
571 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
572 // String permission = aaf.aafPermissionString(topicName, "manage");
573 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
575 // LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
576 // + ". Authentication failed.");
577 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
578 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
579 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
580 // LOGGER.info(errRes);
581 // throw new DMaaPAccessDeniedException(errRes);
585 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
588 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
589 + "] does not exist.");
590 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
591 + "] for topic. Topic [" + topicName + "] does not exist.");
594 topic.permitReadsByUser(consumerId, user);
596 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
597 + "]. Sending response.");
598 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
599 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
603 * @param dmaapContext
606 * @throws DMaaPAccessDeniedException
609 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
610 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
612 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
613 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
614 // if (user == null) {
615 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
616 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
617 // String permission = aaf.aafPermissionString(topicName, "manage");
618 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
620 // LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
621 // + ". Authentication failed.");
622 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
623 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
624 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
625 // LOGGER.info(errRes);
626 // throw new DMaaPAccessDeniedException(errRes);
632 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
635 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
636 + "] does not exist.");
637 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
638 + "] for topic. Topic [" + topicName + "] does not exist.");
641 topic.denyReadsByUser(consumerId, user);
643 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
644 + "]. Sending response.");
645 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
646 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");