1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
25 package com.att.nsa.cambria.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
66 public class TopicServiceImpl implements TopicService {
68 //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
71 private DMaaPErrorMessages errorMessages;
73 //@Value("${msgRtr.topicfactory.aaf}")
74 //private String mrFactory;
79 * @throws JSONException
80 * @throws ConfigDbException
85 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
87 LOGGER.info("Fetching list of all the topics.");
88 JSONObject json = new JSONObject();
90 JSONArray topicsList = new JSONArray();
92 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
93 topicsList.put(topic.getName());
96 json.put("topics", topicsList);
98 LOGGER.info("Returning list of all the topics.");
99 DMaaPResponseBuilder.respondOk(dmaapContext, json);
104 * @param dmaapContext
105 * @throws JSONException
106 * @throws ConfigDbException
107 * @throws IOException
110 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
112 LOGGER.info("Fetching list of all the topics.");
113 JSONObject json = new JSONObject();
115 JSONArray topicsList = new JSONArray();
117 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
118 JSONObject obj = new JSONObject();
119 obj.put("topicName", topic.getName());
120 //obj.put("description", topic.getDescription());
121 obj.put("owner", topic.getOwner());
122 obj.put("txenabled", topic.isTransactionEnabled());
126 json.put("topics", topicsList);
128 LOGGER.info("Returning list of all the topics.");
129 DMaaPResponseBuilder.respondOk(dmaapContext, json);
135 * @param dmaapContext
137 * @throws ConfigDbException
138 * @throws IOException
139 * @throws TopicExistsException
142 public void getTopic(DMaaPContext dmaapContext, String topicName)
143 throws ConfigDbException, IOException, TopicExistsException {
145 LOGGER.info("Fetching details of topic " + topicName);
146 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
149 LOGGER.error("Topic [" + topicName + "] does not exist.");
150 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
153 JSONObject o = new JSONObject();
154 o.put ( "name", t.getName () );
155 o.put ( "description", t.getDescription () );
157 if (null!=t.getOwners ())
158 o.put ( "owner", t.getOwners ().iterator ().next () );
159 if(null!=t.getReaderAcl ())
160 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
161 if(null!=t.getWriterAcl ())
162 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
164 LOGGER.info("Returning details of topic " + topicName);
165 DMaaPResponseBuilder.respondOk(dmaapContext, o);
171 * @param dmaapContext
173 * @throws CambriaApiException
174 * @throws AccessDeniedException
175 * @throws IOException
176 * @throws TopicExistsException
177 * @throws JSONException
183 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
184 throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
186 LOGGER.info("Creating topic " + topicBean.getTopicName());
188 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
190 String appName=dmaapContext.getRequest().getHeader("AppName");
191 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
197 if( enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
199 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
201 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
202 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
203 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
204 LOGGER.info(errRes.toString());
205 throw new DMaaPAccessDeniedException(errRes);
210 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
211 else if (user == null && null==dmaapContext.getRequest().getHeader("Authorization") &&
212 (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
213 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
215 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
216 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
217 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
218 LOGGER.info(errRes.toString());
219 throw new DMaaPAccessDeniedException(errRes);
222 if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") ||
223 null != dmaapContext.getRequest().getHeader("cookie"))) {
224 //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
225 // ACL authentication is not provided so we will use the aaf authentication
226 LOGGER.info("Authorization the topic");
228 String permission = "";
230 if(topicBean.getTopicName().indexOf(".")>1)
231 nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
233 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
235 //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
237 permission = mrFactoryVal+nameSpace+"|create";
238 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
240 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
243 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
245 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
246 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
247 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
248 LOGGER.info(errRes.toString());
249 throw new DMaaPAccessDeniedException(errRes);
252 // if user is null and aaf authentication is ok then key should be ""
255 * Added as part of AAF user it should return username
258 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
259 LOGGER.info("key ==================== "+key);
265 final String topicName = topicBean.getTopicName();
266 final String desc = topicBean.getTopicDescription();
268 final int partitions = topicBean.getPartitionCount();
270 final int replicas = topicBean.getReplicationCount();
271 boolean transactionEnabled = topicBean.isTransactionEnabled();
274 final Broker metabroker = getMetaBroker(dmaapContext);
275 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
278 LOGGER.info("Topic created successfully. Sending response");
279 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
280 } catch (JSONException excp) {
282 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
283 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
284 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(),
285 errorMessages.getIncorrectJson());
286 LOGGER.info(errRes.toString());
287 throw new CambriaApiException(errRes);
293 * @param dmaapContext
295 * @throws ConfigDbException
296 * @throws IOException
297 * @throws TopicExistsException
298 * @throws CambriaApiException
299 * @throws AccessDeniedException
302 public void deleteTopic(DMaaPContext dmaapContext, String topicName)
303 throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
305 LOGGER.info("Deleting topic " + topicName);
306 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
308 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
309 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
310 // String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
311 String permission = "";
312 String nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
313 String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
314 // String tokens[] = topicName.split(".mr.topic.");
315 permission = mrFactoryVal+nameSpace+"|destroy";
316 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
317 if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
319 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
320 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
321 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
322 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
323 LOGGER.info(errRes.toString());
324 throw new DMaaPAccessDeniedException(errRes);
330 final Broker metabroker = getMetaBroker(dmaapContext);
331 final Topic topic = metabroker.getTopic(topicName);
334 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
335 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
338 metabroker.deleteTopic(topicName);
340 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
341 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
347 * @param dmaapContext
350 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
351 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
355 * @param dmaapContext
357 * @throws ConfigDbException
358 * @throws IOException
359 * @throws TopicExistsException
363 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
364 throws ConfigDbException, IOException, TopicExistsException {
365 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
366 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
369 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
370 throw new TopicExistsException(
371 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
376 final NsaAcl acl = topic.getWriterAcl();
378 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
379 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
388 private static JSONObject aclToJson(NsaAcl acl) {
389 final JSONObject o = new JSONObject();
391 o.put("enabled", false);
392 o.put("users", new JSONArray());
394 o.put("enabled", acl.isActive());
396 final JSONArray a = new JSONArray();
397 for (String user : acl.getUsers()) {
406 * @param dmaapContext
410 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
411 throws IOException, ConfigDbException, TopicExistsException {
412 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
413 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
416 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
417 throw new TopicExistsException(
418 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
421 final NsaAcl acl = topic.getReaderAcl();
423 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
424 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
433 private static JSONObject topicToJson(Topic t) {
434 final JSONObject o = new JSONObject();
436 o.put("name", t.getName());
437 o.put("description", t.getDescription());
438 o.put("owner", t.getOwner());
439 o.put("readerAcl", aclToJson(t.getReaderAcl()));
440 o.put("writerAcl", aclToJson(t.getWriterAcl()));
446 * @param dmaapContext
449 * @throws ConfigDbException
450 * @throws IOException
451 * @throws TopicExistsException
452 * @throws AccessDeniedException
457 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
458 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
460 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
461 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
463 // if (user == null) {
465 // LOGGER.info("Authenticating the user, as ACL authentication is not provided");
466 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
468 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
469 // String permission = aaf.aafPermissionString(topicName, "manage");
470 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
472 // LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
473 // + ". Authentication failed.");
474 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
475 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
476 // errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
477 // LOGGER.info(errRes);
478 // throw new DMaaPAccessDeniedException(errRes);
482 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
485 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
486 + "] does not exist.");
487 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
488 + "] for topic. Topic [" + topicName + "] does not exist.");
491 topic.permitWritesFromUser(producerId, user);
493 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
494 + "]. Sending response.");
495 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
500 * @param dmaapContext
503 * @throws ConfigDbException
504 * @throws IOException
505 * @throws TopicExistsException
506 * @throws AccessDeniedException
507 * @throws DMaaPAccessDeniedException
511 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
512 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
514 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
515 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
516 // if (user == null) {
518 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
519 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
520 // String permission = aaf.aafPermissionString(topicName, "manage");
521 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
523 // LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
524 // + ". Authentication failed.");
525 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
526 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
527 // errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
528 // LOGGER.info(errRes);
529 // throw new DMaaPAccessDeniedException(errRes);
534 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
537 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
538 + "] does not exist.");
539 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
540 + "] for topic. Topic [" + topicName + "] does not exist.");
543 topic.denyWritesFromUser(producerId, user);
545 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
546 + "]. Sending response.");
547 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
552 * @param dmaapContext
555 * @throws DMaaPAccessDeniedException
558 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
559 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
561 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
562 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
563 // if (user == null) {
565 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
566 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
567 // String permission = aaf.aafPermissionString(topicName, "manage");
568 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
570 // LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
571 // + ". Authentication failed.");
572 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
573 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
574 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
575 // LOGGER.info(errRes);
576 // throw new DMaaPAccessDeniedException(errRes);
580 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
583 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
584 + "] does not exist.");
585 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
586 + "] for topic. Topic [" + topicName + "] does not exist.");
589 topic.permitReadsByUser(consumerId, user);
591 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
592 + "]. Sending response.");
593 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
594 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
598 * @param dmaapContext
601 * @throws DMaaPAccessDeniedException
604 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
605 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
607 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
608 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
609 // if (user == null) {
610 //// String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
611 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
612 // String permission = aaf.aafPermissionString(topicName, "manage");
613 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
615 // LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
616 // + ". Authentication failed.");
617 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
618 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
619 // errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
620 // LOGGER.info(errRes);
621 // throw new DMaaPAccessDeniedException(errRes);
627 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
630 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
631 + "] does not exist.");
632 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
633 + "] for topic. Topic [" + topicName + "] does not exist.");
636 topic.denyReadsByUser(consumerId, user);
638 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
639 + "]. Sending response.");
640 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
641 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");