f6d7b212adf62bc6f9380590f14c59a57e50c6bb
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / service / impl / TopicServiceImpl.java
1 /**
2  * 
3  */
4 /*******************************************************************************
5  *  ============LICENSE_START=======================================================
6  *  org.onap.dmaap
7  *  ================================================================================
8  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14 *  
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *  
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *  
24  *******************************************************************************/
25 package com.att.dmf.mr.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
35
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.beans.DMaaPContext;
39 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import com.att.dmf.mr.beans.TopicBean;
41 import com.att.dmf.mr.constants.CambriaConstants;
42 import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
43 import com.att.dmf.mr.exception.DMaaPErrorMessages;
44 import com.att.dmf.mr.exception.DMaaPResponseCode;
45 import com.att.dmf.mr.exception.ErrorResponse;
46 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
47 import com.att.dmf.mr.metabroker.Broker1;
48
49 import com.att.dmf.mr.metabroker.Topic;
50 import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
51 import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import com.att.dmf.mr.service.TopicService;
54 import com.att.dmf.mr.utils.DMaaPResponseBuilder;
55 import com.att.eelf.configuration.EELFLogger;
56 import com.att.eelf.configuration.EELFManager;
57 import com.att.nsa.configs.ConfigDbException;
58 import com.att.nsa.security.NsaAcl;
59 import com.att.nsa.security.NsaApiKey;
60 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
61
62 /**
63  * @author muzainulhaque.qazi
64  *
65  */
66 @Service
67 public class TopicServiceImpl implements TopicService {
68
69         // private static final Logger LOGGER =
70         
71         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
72         @Autowired
73         private DMaaPErrorMessages errorMessages;
74
75         // @Value("${msgRtr.topicfactory.aaf}")
76         
77
78         public DMaaPErrorMessages getErrorMessages() {
79                 return errorMessages;
80         }
81
82         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83                 this.errorMessages = errorMessages;
84         }
85
86         /**
87          * @param dmaapContext
88          * @throws JSONException
89          * @throws ConfigDbException
90          * @throws IOException
91          * 
92          */
93         @Override
94         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
95                 LOGGER.info("Fetching list of all the topics.");
96                 JSONObject json = new JSONObject();
97
98                 JSONArray topicsList = new JSONArray();
99
100                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
101                         topicsList.put(topic.getName());
102                 }
103
104                 json.put("topics", topicsList);
105
106                 LOGGER.info("Returning list of all the topics.");
107                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
108
109         }
110
111         /**
112          * @param dmaapContext
113          * @throws JSONException
114          * @throws ConfigDbException
115          * @throws IOException
116          * 
117          */
118         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119
120                 LOGGER.info("Fetching list of all the topics.");
121                 JSONObject json = new JSONObject();
122
123                 JSONArray topicsList = new JSONArray();
124
125                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
126                         JSONObject obj = new JSONObject();
127                         obj.put("topicName", topic.getName());
128                         
129                         obj.put("owner", topic.getOwner());
130                         obj.put("txenabled", topic.isTransactionEnabled());
131                         topicsList.put(obj);
132                 }
133
134                 json.put("topics", topicsList);
135
136                 LOGGER.info("Returning list of all the topics.");
137                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
138
139         }
140
141         /**
142          * @param dmaapContext
143          * @param topicName
144          * @throws ConfigDbException
145          * @throws IOException
146          * @throws TopicExistsException
147          */
148         @Override
149         public void getTopic(DMaaPContext dmaapContext, String topicName)
150                         throws ConfigDbException, IOException, TopicExistsException {
151
152                 LOGGER.info("Fetching details of topic " + topicName);
153                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
154
155                 if (null == t) {
156                         LOGGER.error("Topic [" + topicName + "] does not exist.");
157                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
158                 }
159
160                 JSONObject o = new JSONObject();
161                 o.put("name", t.getName());
162                 o.put("description", t.getDescription());
163
164                 if (null != t.getOwners())
165                         o.put("owner", t.getOwners().iterator().next());
166                 if (null != t.getReaderAcl())
167                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
168                 if (null != t.getWriterAcl())
169                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
170
171                 LOGGER.info("Returning details of topic " + topicName);
172                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
173
174         }
175
176         /**
177          * @param dmaapContext
178          * @param topicBean
179          * @throws CambriaApiException
180          * @throws AccessDeniedException
181          * @throws IOException
182          * @throws TopicExistsException
183          * @throws JSONException
184          * 
185          * 
186          * 
187          */
188         @Override
189         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190                         throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
191                 LOGGER.info("Creating topic " + topicBean.getTopicName());
192
193                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
194                 String key = null;
195                 String appName = dmaapContext.getRequest().getHeader("AppName");
196                 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
197                                 "enforced.topic.name.AAF");
198
199                 if (user != null) {
200                         key = user.getKey();
201
202                         if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
203
204                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
205
206                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
207                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
208                                                 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
209
210                                 LOGGER.info(errRes.toString());
211                                 // throw new DMaaPAccessDeniedException(errRes);
212
213                         }
214                 }
215                 // else if (user==null &&
216                 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
217                 // == dmaapContext.getRequest().getHeader("cookie")) ) {
218                 else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
219                                 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
220                         LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
221
222                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
223                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
224                                         "Failed to create topic: Access Denied.User does not have permission to perform create topic");
225
226                         LOGGER.info(errRes.toString());
227                         // throw new DMaaPAccessDeniedException(errRes);
228                 }
229
230                 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
231                                 )) {
232                         // if (user == null &&
233                         // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
234                         // null != dmaapContext.getRequest().getHeader("cookie"))) {
235                         // ACL authentication is not provided so we will use the aaf
236                         // authentication
237                         LOGGER.info("Authorization the topic");
238
239                         String permission = "";
240                         String nameSpace = "";
241                         if (topicBean.getTopicName().indexOf(".") > 1)
242                                 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
243
244                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
245                                         "msgRtr.topicfactory.aaf");
246
247                         // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
248
249                         permission = mrFactoryVal + nameSpace + "|create";
250                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
251
252                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
253
254                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
255
256                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
257                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
258                                                 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
259                                                                 + permission);
260
261                                 LOGGER.info(errRes.toString());
262                                 throw new DMaaPAccessDeniedException(errRes);
263
264                         } else {
265                                 // if user is null and aaf authentication is ok then key should
266                                 // be ""
267                                 // key = "";
268                                 /**
269                                  * Added as part of AAF user it should return username
270                                  */
271
272                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
273                                 LOGGER.info("key ==================== " + key);
274
275                         }
276                 }
277
278                 try {
279                         final String topicName = topicBean.getTopicName();
280                         final String desc = topicBean.getTopicDescription();
281                         int partition = topicBean.getPartitionCount();
282                         // int replica = topicBean.getReplicationCount();
283                         if (partition == 0) {
284                                 partition = 1;
285                         }
286                         final int partitions = partition;
287
288                         int replica = topicBean.getReplicationCount();
289                         if (replica == 0) {
290                                 replica = 1;
291                         }
292                         final int replicas = replica;
293                         boolean transactionEnabled = topicBean.isTransactionEnabled();
294
295                         final Broker1 metabroker = getMetaBroker(dmaapContext);
296                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
297
298                         LOGGER.info("Topic created successfully. Sending response");
299                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
300                 } catch (JSONException excp) {
301
302                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
303                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
304                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
305                         LOGGER.info(errRes.toString());
306                         throw new CambriaApiException(errRes);
307
308                 } catch (ConfigDbException excp1) {
309
310                         LOGGER.error("Failed to create topic.  Config DB Exception", excp1);
311                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
312                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
313                         LOGGER.info(errRes.toString());
314                         throw new CambriaApiException(errRes);
315                 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
316                         // TODO Auto-generated catch block
317                         LOGGER.error( e.getMessage());
318                 }
319         }
320
321         /**
322          * @param dmaapContext
323          * @param topicName
324          * @throws ConfigDbException
325          * @throws IOException
326          * @throws TopicExistsException
327          * @throws CambriaApiException
328          * @throws AccessDeniedException
329          */
330         @Override
331         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
332                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
333
334
335                 LOGGER.info(" Deleting topic " + topicName);
336                 /*if (true) { // {
337                         LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
338                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
339                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
340                                                         + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
341                         LOGGER.info(errRes.toString());
342                         throw new DMaaPAccessDeniedException(errRes);
343                 }*/
344
345                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
346
347                 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
348                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
349                         // String permission =
350                         
351                         String permission = "";
352                         String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
353                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
354                                         "msgRtr.topicfactory.aaf");
355                         
356                         permission = mrFactoryVal + nameSpace + "|destroy";
357                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
358                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
359                                 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
360                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
361                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
362                                                 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
363                                                                 + errorMessages.getNotPermitted2());
364                                 LOGGER.info(errRes.toString());
365                                 throw new DMaaPAccessDeniedException(errRes);
366                         }
367
368                 }
369
370                 final Broker1 metabroker = getMetaBroker(dmaapContext);
371                 final Topic topic = metabroker.getTopic(topicName);
372
373                 if (topic == null) {
374                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
375                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
376                 }
377
378                 // metabroker.deleteTopic(topicName);
379
380                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
381                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
382         }
383
384         /**
385          * 
386          * @param dmaapContext
387          * @return
388          */
389         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
390                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
391         }
392
393         /**
394          * @param dmaapContext
395          * @param topicName
396          * @throws ConfigDbException
397          * @throws IOException
398          * @throws TopicExistsException
399          * 
400          */
401         @Override
402         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
403                         throws ConfigDbException, IOException, TopicExistsException {
404                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
405                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
406
407                 if (topic == null) {
408                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
409                         throw new TopicExistsException(
410                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
411                 }
412
413                 final NsaAcl acl = topic.getWriterAcl();
414
415                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
416                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
417
418         }
419
420         /**
421          * 
422          * @param acl
423          * @return
424          */
425         private static JSONObject aclToJson(NsaAcl acl) {
426                 final JSONObject o = new JSONObject();
427                 if (acl == null) {
428                         o.put("enabled", false);
429                         o.put("users", new JSONArray());
430                 } else {
431                         o.put("enabled", acl.isActive());
432
433                         final JSONArray a = new JSONArray();
434                         for (String user : acl.getUsers()) {
435                                 a.put(user);
436                         }
437                         o.put("users", a);
438                 }
439                 return o;
440         }
441
442         /**
443          * @param dmaapContext
444          * @param topicName
445          */
446         @Override
447         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
448                         throws IOException, ConfigDbException, TopicExistsException {
449                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
450                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
451
452                 if (topic == null) {
453                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
454                         throw new TopicExistsException(
455                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
456                 }
457
458                 final NsaAcl acl = topic.getReaderAcl();
459
460                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
461                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
462
463         }
464
465         /**
466          * 
467          * @param t
468          * @return
469          */
470         private static JSONObject topicToJson(Topic t) {
471                 final JSONObject o = new JSONObject();
472
473                 o.put("name", t.getName());
474                 o.put("description", t.getDescription());
475                 o.put("owner", t.getOwner());
476                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
477                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
478
479                 return o;
480         }
481
482         /**
483          * @param dmaapContext
484          *                      @param topicName @param producerId @throws
485          *            ConfigDbException @throws IOException @throws
486          *            TopicExistsException @throws AccessDeniedException @throws
487          * 
488          */
489         @Override
490         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
491                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
492
493                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
494                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
495
496                 
497                 //
498                 // LOGGER.info("Authenticating the user, as ACL authentication is not
499                 
500                 //// String permission =
501                 
502                 //
503                 
504                 
505                 
506                 // {
507                 // LOGGER.error("Failed to permit write access to producer [" +
508                 // producerId + "] for topic " + topicName
509                 
510                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
511                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
512                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
513                 
514                 
515                 
516                 // }
517                 // }
518
519                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
520
521                 if (null == topic) {
522                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
523                                         + "] does not exist.");
524                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
525                                         + "] for topic. Topic [" + topicName + "] does not exist.");
526                 }
527
528                 topic.permitWritesFromUser(producerId, user);
529
530                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
531                                 + "]. Sending response.");
532                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
533
534         }
535
536         /**
537          * @param dmaapContext
538          * @param topicName
539          * @param producerId
540          * @throws ConfigDbException
541          * @throws IOException
542          * @throws TopicExistsException
543          * @throws AccessDeniedException
544          * @throws DMaaPAccessDeniedException
545          * 
546          */
547         @Override
548         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
549                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
550                         DMaaPAccessDeniedException {
551
552                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
553                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
554                 
555                 //
556                 //// String permission =
557                 
558                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
559                 // String permission = aaf.aafPermissionString(topicName, "manage");
560                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
561                 // {
562                 // LOGGER.error("Failed to revoke write access to producer [" +
563                 // producerId + "] for topic " + topicName
564                 
565                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
566                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
567                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
568                 
569                 
570                 // throw new DMaaPAccessDeniedException(errRes);
571                 //
572         
573                 // }
574
575                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
576
577                 if (null == topic) {
578                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
579                                         + "] does not exist.");
580                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
581                                         + "] for topic. Topic [" + topicName + "] does not exist.");
582                 }
583
584                 topic.denyWritesFromUser(producerId, user);
585
586                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
587                                 + "]. Sending response.");
588                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
589
590         }
591
592         /**
593          * @param dmaapContext
594          * @param topicName
595          * @param consumerId
596          * @throws DMaaPAccessDeniedException
597          */
598         @Override
599         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
600                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
601                         DMaaPAccessDeniedException {
602
603                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
604                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
605                 
606                 //
607                 //// String permission =
608                 
609                 
610                 // String permission = aaf.aafPermissionString(topicName, "manage");
611                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
612                 // {
613                 // LOGGER.error("Failed to permit read access to consumer [" +
614                 // consumerId + "] for topic " + topicName
615                 
616                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
617                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
618                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
619                 
620                 
621                 
622                 // }
623                 // }
624
625                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
626
627                 if (null == topic) {
628                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
629                                         + "] does not exist.");
630                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
631                                         + "] for topic. Topic [" + topicName + "] does not exist.");
632                 }
633
634                 topic.permitReadsByUser(consumerId, user);
635
636                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
637                                 + "]. Sending response.");
638                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
639                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
640         }
641
642         /**
643          * @param dmaapContext
644          * @param topicName
645          * @param consumerId
646          * @throws DMaaPAccessDeniedException
647          */
648         @Override
649         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
650                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
651                         DMaaPAccessDeniedException {
652
653                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
654                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
655                 
656                 //// String permission =
657                 
658                 
659                 // String permission = aaf.aafPermissionString(topicName, "manage");
660                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
661                 // {
662                 // LOGGER.error("Failed to revoke read access to consumer [" +
663                 // consumerId + "] for topic " + topicName
664                 
665                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
666                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
667                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
668                 
669                 
670                 // throw new DMaaPAccessDeniedException(errRes);
671                 // }
672                 //
673                 //
674         
675                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
676
677                 if (null == topic) {
678                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
679                                         + "] does not exist.");
680                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
681                                         + "] for topic. Topic [" + topicName + "] does not exist.");
682                 }
683
684                 topic.denyReadsByUser(consumerId, user);
685
686                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
687                                 + "]. Sending response.");
688                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
689                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
690
691         }
692
693 }