f2ba222639983e06e873b7a11f71b7b813bf862a
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / TopicServiceImpl.java
1 /**
2  * 
3  */
4 /*******************************************************************************
5  *  ============LICENSE_START=======================================================
6  *  org.onap.dmaap
7  *  ================================================================================
8  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14 *  
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *  
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *  
24  *******************************************************************************/
25 package org.onap.dmaap.dmf.mr.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
35
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import org.onap.dmaap.dmf.mr.CambriaApiException;
38 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
39 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import org.onap.dmaap.dmf.mr.beans.TopicBean;
41 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
42 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
43 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
45 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
46 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
47 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
48
49 import org.onap.dmaap.dmf.mr.metabroker.Topic;
50 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.service.TopicService;
54 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.eelf.configuration.EELFLogger;
57 import com.att.eelf.configuration.EELFManager;
58 import com.att.nsa.configs.ConfigDbException;
59 import com.att.nsa.security.NsaAcl;
60 import com.att.nsa.security.NsaApiKey;
61 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
62
63 /**
64  * @author muzainulhaque.qazi
65  *
66  */
67 @Service
68 public class TopicServiceImpl implements TopicService {
69
70         // private static final Logger LOGGER =
71         
72         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
73         @Autowired
74         private DMaaPErrorMessages errorMessages;
75
76         // @Value("${msgRtr.topicfactory.aaf}")
77         
78
79         public DMaaPErrorMessages getErrorMessages() {
80                 return errorMessages;
81         }
82
83         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
84                 this.errorMessages = errorMessages;
85         }
86
87         /**
88          * @param dmaapContext
89          * @throws JSONException
90          * @throws ConfigDbException
91          * @throws IOException
92          * 
93          */
94         @Override
95         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
96                 LOGGER.info("Fetching list of all the topics.");
97                 JSONObject json = new JSONObject();
98
99                 JSONArray topicsList = new JSONArray();
100
101                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
102                         topicsList.put(topic.getName());
103                 }
104
105                 json.put("topics", topicsList);
106
107                 LOGGER.info("Returning list of all the topics.");
108                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
109
110         }
111
112         /**
113          * @param dmaapContext
114          * @throws JSONException
115          * @throws ConfigDbException
116          * @throws IOException
117          * 
118          */
119         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
120
121                 LOGGER.info("Fetching list of all the topics.");
122                 JSONObject json = new JSONObject();
123
124                 JSONArray topicsList = new JSONArray();
125
126                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
127                         JSONObject obj = new JSONObject();
128                         obj.put("topicName", topic.getName());
129                         
130                         obj.put("owner", topic.getOwner());
131                         obj.put("txenabled", topic.isTransactionEnabled());
132                         topicsList.put(obj);
133                 }
134
135                 json.put("topics", topicsList);
136
137                 LOGGER.info("Returning list of all the topics.");
138                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
139
140         }
141
142         /**
143          * @param dmaapContext
144          * @param topicName
145          * @throws ConfigDbException
146          * @throws IOException
147          * @throws TopicExistsException
148          */
149         @Override
150         public void getTopic(DMaaPContext dmaapContext, String topicName)
151                         throws ConfigDbException, IOException, TopicExistsException {
152
153                 LOGGER.info("Fetching details of topic " + topicName);
154                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
155
156                 if (null == t) {
157                         LOGGER.error("Topic [" + topicName + "] does not exist.");
158                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
159                 }
160
161                 JSONObject o = new JSONObject();
162                 o.put("name", t.getName());
163                 o.put("description", t.getDescription());
164
165                 if (null != t.getOwners())
166                         o.put("owner", t.getOwners().iterator().next());
167                 if (null != t.getReaderAcl())
168                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
169                 if (null != t.getWriterAcl())
170                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
171
172                 LOGGER.info("Returning details of topic " + topicName);
173                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
174
175         }
176
177         /**
178          * @param dmaapContext
179          * @param topicBean
180          * @throws CambriaApiException
181          * @throws AccessDeniedException
182          * @throws IOException
183          * @throws TopicExistsException
184          * @throws JSONException
185          * 
186          * 
187          * 
188          */
189         @Override
190         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
191                         throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
192                 LOGGER.info("Creating topic " + topicBean.getTopicName());
193
194                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
195                 String key = null;
196                 String appName = dmaapContext.getRequest().getHeader("AppName");
197                 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
198                                 "enforced.topic.name.AAF");
199
200                 if (user != null) {
201                         key = user.getKey();
202
203                         if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
204
205                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
206
207                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
208                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
209                                                 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
210
211                                 LOGGER.info(errRes.toString());
212                                 // throw new DMaaPAccessDeniedException(errRes);
213
214                         }
215                 }
216                 // else if (user==null &&
217                 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
218                 // == dmaapContext.getRequest().getHeader("cookie")) ) {
219                 else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
220                                 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
221                         LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
222
223                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
224                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
225                                         "Failed to create topic: Access Denied.User does not have permission to perform create topic");
226
227                         LOGGER.info(errRes.toString());
228                         // throw new DMaaPAccessDeniedException(errRes);
229                 }
230
231                 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
232                                 )) {
233                         // if (user == null &&
234                         // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
235                         // null != dmaapContext.getRequest().getHeader("cookie"))) {
236                         // ACL authentication is not provided so we will use the aaf
237                         // authentication
238                         LOGGER.info("Authorization the topic");
239
240                         String permission = "";
241                         String nameSpace = "";
242                         if (topicBean.getTopicName().indexOf(".") > 1)
243                                 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
244
245                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246                                         "msgRtr.topicfactory.aaf");
247
248                         // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
249
250                         permission = mrFactoryVal + nameSpace + "|create";
251                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
252
253                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
254
255                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
256
257                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
258                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
259                                                 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
260                                                                 + permission);
261
262                                 LOGGER.info(errRes.toString());
263                                 throw new DMaaPAccessDeniedException(errRes);
264
265                         } else {
266                                 // if user is null and aaf authentication is ok then key should
267                                 // be ""
268                                 // key = "";
269                                 /**
270                                  * Added as part of AAF user it should return username
271                                  */
272
273                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
274                                 LOGGER.info("key ==================== " + key);
275
276                         }
277                 }
278
279                 try {
280                         final String topicName = topicBean.getTopicName();
281                         final String desc = topicBean.getTopicDescription();
282                         int partition = topicBean.getPartitionCount();
283                         // int replica = topicBean.getReplicationCount();
284                         if (partition == 0) {
285                                 partition = 1;
286                         }
287                         final int partitions = partition;
288
289                         int replica = topicBean.getReplicationCount();
290                         if (replica == 0) {
291                                 replica = 1;
292                         }
293                         final int replicas = replica;
294                         boolean transactionEnabled = topicBean.isTransactionEnabled();
295
296                         final Broker1 metabroker = getMetaBroker(dmaapContext);
297                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
298
299                         LOGGER.info("Topic created successfully. Sending response");
300                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
301                 } catch (JSONException excp) {
302
303                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
304                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
305                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
306                         LOGGER.info(errRes.toString());
307                         throw new CambriaApiException(errRes);
308
309                 } catch (ConfigDbException excp1) {
310
311                         LOGGER.error("Failed to create topic.  Config DB Exception", excp1);
312                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
313                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
314                         LOGGER.info(errRes.toString());
315                         throw new CambriaApiException(errRes);
316                 } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) {
317                         // TODO Auto-generated catch block
318                         LOGGER.error( e.getMessage());
319                 }
320         }
321
322         /**
323          * @param dmaapContext
324          * @param topicName
325          * @throws ConfigDbException
326          * @throws IOException
327          * @throws TopicExistsException
328          * @throws CambriaApiException
329          * @throws AccessDeniedException
330          */
331         @Override
332         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
333                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
334
335
336                 LOGGER.info(" Deleting topic " + topicName);
337                 /*if (true) { // {
338                         LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
339                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
340                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
341                                                         + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
342                         LOGGER.info(errRes.toString());
343                         throw new DMaaPAccessDeniedException(errRes);
344                 }*/
345
346                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
347
348                 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
349                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
350                         // String permission =
351                         
352                         String permission = "";
353                         String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
354                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
355                                         "msgRtr.topicfactory.aaf");
356                         
357                         permission = mrFactoryVal + nameSpace + "|destroy";
358                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
359                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
360                                 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
361                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
362                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
363                                                 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
364                                                                 + errorMessages.getNotPermitted2());
365                                 LOGGER.info(errRes.toString());
366                                 throw new DMaaPAccessDeniedException(errRes);
367                         }
368
369                 }
370
371                 final Broker1 metabroker = getMetaBroker(dmaapContext);
372                 final Topic topic = metabroker.getTopic(topicName);
373
374                 if (topic == null) {
375                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
376                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
377                 }
378
379                 // metabroker.deleteTopic(topicName);
380
381                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
382                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
383         }
384
385         /**
386          * 
387          * @param dmaapContext
388          * @return
389          */
390         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
391                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
392         }
393
394         /**
395          * @param dmaapContext
396          * @param topicName
397          * @throws ConfigDbException
398          * @throws IOException
399          * @throws TopicExistsException
400          * 
401          */
402         @Override
403         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
404                         throws ConfigDbException, IOException, TopicExistsException {
405                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
406                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
407
408                 if (topic == null) {
409                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
410                         throw new TopicExistsException(
411                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
412                 }
413
414                 final NsaAcl acl = topic.getWriterAcl();
415
416                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
417                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
418
419         }
420
421         /**
422          * 
423          * @param acl
424          * @return
425          */
426         private static JSONObject aclToJson(NsaAcl acl) {
427                 final JSONObject o = new JSONObject();
428                 if (acl == null) {
429                         o.put("enabled", false);
430                         o.put("users", new JSONArray());
431                 } else {
432                         o.put("enabled", acl.isActive());
433
434                         final JSONArray a = new JSONArray();
435                         for (String user : acl.getUsers()) {
436                                 a.put(user);
437                         }
438                         o.put("users", a);
439                 }
440                 return o;
441         }
442
443         /**
444          * @param dmaapContext
445          * @param topicName
446          */
447         @Override
448         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
449                         throws IOException, ConfigDbException, TopicExistsException {
450                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
451                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
452
453                 if (topic == null) {
454                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
455                         throw new TopicExistsException(
456                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
457                 }
458
459                 final NsaAcl acl = topic.getReaderAcl();
460
461                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
462                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
463
464         }
465
466         /**
467          * 
468          * @param t
469          * @return
470          */
471         private static JSONObject topicToJson(Topic t) {
472                 final JSONObject o = new JSONObject();
473
474                 o.put("name", t.getName());
475                 o.put("description", t.getDescription());
476                 o.put("owner", t.getOwner());
477                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
478                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
479
480                 return o;
481         }
482
483         /**
484          * @param dmaapContext
485          *                      @param topicName @param producerId @throws
486          *            ConfigDbException @throws IOException @throws
487          *            TopicExistsException @throws AccessDeniedException @throws
488          * 
489          */
490         @Override
491         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
492                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
493
494                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
495                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
496
497                 
498                 //
499                 // LOGGER.info("Authenticating the user, as ACL authentication is not
500                 
501                 //// String permission =
502                 
503                 //
504                 
505                 
506                 
507                 // {
508                 // LOGGER.error("Failed to permit write access to producer [" +
509                 // producerId + "] for topic " + topicName
510                 
511                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
512                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
513                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
514                 
515                 
516                 
517                 // }
518                 // }
519
520                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
521
522                 if (null == topic) {
523                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
524                                         + "] does not exist.");
525                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
526                                         + "] for topic. Topic [" + topicName + "] does not exist.");
527                 }
528
529                 topic.permitWritesFromUser(producerId, user);
530
531                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
532                                 + "]. Sending response.");
533                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
534
535         }
536
537         /**
538          * @param dmaapContext
539          * @param topicName
540          * @param producerId
541          * @throws ConfigDbException
542          * @throws IOException
543          * @throws TopicExistsException
544          * @throws AccessDeniedException
545          * @throws DMaaPAccessDeniedException
546          * 
547          */
548         @Override
549         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
550                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
551                         DMaaPAccessDeniedException {
552
553                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
554                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
555                 
556                 //
557                 //// String permission =
558                 
559                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
560                 // String permission = aaf.aafPermissionString(topicName, "manage");
561                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
562                 // {
563                 // LOGGER.error("Failed to revoke write access to producer [" +
564                 // producerId + "] for topic " + topicName
565                 
566                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
567                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
568                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
569                 
570                 
571                 // throw new DMaaPAccessDeniedException(errRes);
572                 //
573         
574                 // }
575
576                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
577
578                 if (null == topic) {
579                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
580                                         + "] does not exist.");
581                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
582                                         + "] for topic. Topic [" + topicName + "] does not exist.");
583                 }
584
585                 topic.denyWritesFromUser(producerId, user);
586
587                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
588                                 + "]. Sending response.");
589                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
590
591         }
592
593         /**
594          * @param dmaapContext
595          * @param topicName
596          * @param consumerId
597          * @throws DMaaPAccessDeniedException
598          */
599         @Override
600         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
601                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
602                         DMaaPAccessDeniedException {
603
604                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
605                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
606                 
607                 //
608                 //// String permission =
609                 
610                 
611                 // String permission = aaf.aafPermissionString(topicName, "manage");
612                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
613                 // {
614                 // LOGGER.error("Failed to permit read access to consumer [" +
615                 // consumerId + "] for topic " + topicName
616                 
617                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
618                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
619                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
620                 
621                 
622                 
623                 // }
624                 // }
625
626                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
627
628                 if (null == topic) {
629                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
630                                         + "] does not exist.");
631                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
632                                         + "] for topic. Topic [" + topicName + "] does not exist.");
633                 }
634
635                 topic.permitReadsByUser(consumerId, user);
636
637                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
638                                 + "]. Sending response.");
639                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
640                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
641         }
642
643         /**
644          * @param dmaapContext
645          * @param topicName
646          * @param consumerId
647          * @throws DMaaPAccessDeniedException
648          */
649         @Override
650         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
651                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
652                         DMaaPAccessDeniedException {
653
654                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
655                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
656                 
657                 //// String permission =
658                 
659                 
660                 // String permission = aaf.aafPermissionString(topicName, "manage");
661                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
662                 // {
663                 // LOGGER.error("Failed to revoke read access to consumer [" +
664                 // consumerId + "] for topic " + topicName
665                 
666                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
667                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
668                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
669                 
670                 
671                 // throw new DMaaPAccessDeniedException(errRes);
672                 // }
673                 //
674                 //
675         
676                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
677
678                 if (null == topic) {
679                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
680                                         + "] does not exist.");
681                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
682                                         + "] for topic. Topic [" + topicName + "] does not exist.");
683                 }
684
685                 topic.denyReadsByUser(consumerId, user);
686
687                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
688                                 + "]. Sending response.");
689                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
690                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
691
692         }
693
694 }