Fixes for Kafka upgrade issues
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / service / impl / TopicServiceImpl.java
1 /**
2  * 
3  */
4 /*******************************************************************************
5  *  ============LICENSE_START=======================================================
6  *  org.onap.dmaap
7  *  ================================================================================
8  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14 *  
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *  
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *  
24  *******************************************************************************/
25 package com.att.dmf.mr.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
35
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.beans.DMaaPContext;
39 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import com.att.dmf.mr.beans.TopicBean;
41 import com.att.dmf.mr.constants.CambriaConstants;
42 import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
43 import com.att.dmf.mr.exception.DMaaPErrorMessages;
44 import com.att.dmf.mr.exception.DMaaPResponseCode;
45 import com.att.dmf.mr.exception.ErrorResponse;
46 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
47 import com.att.dmf.mr.metabroker.Broker1;
48 //import com.att.dmf.mr.metabroker.Broker1;
49 import com.att.dmf.mr.metabroker.Topic;
50 import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
51 import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import com.att.dmf.mr.service.TopicService;
54 import com.att.dmf.mr.utils.DMaaPResponseBuilder;
55 import com.att.eelf.configuration.EELFLogger;
56 import com.att.eelf.configuration.EELFManager;
57 import com.att.nsa.configs.ConfigDbException;
58 import com.att.nsa.security.NsaAcl;
59 import com.att.nsa.security.NsaApiKey;
60 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
61
62 /**
63  * @author muzainulhaque.qazi
64  *
65  */
66 @Service
67 public class TopicServiceImpl implements TopicService {
68
69         // private static final Logger LOGGER =
70         // Logger.getLogger(TopicServiceImpl.class);
71         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
72         @Autowired
73         private DMaaPErrorMessages errorMessages;
74
75         // @Value("${msgRtr.topicfactory.aaf}")
76         // private String mrFactory;
77
78         public DMaaPErrorMessages getErrorMessages() {
79                 return errorMessages;
80         }
81
82         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83                 this.errorMessages = errorMessages;
84         }
85
86         /**
87          * @param dmaapContext
88          * @throws JSONException
89          * @throws ConfigDbException
90          * @throws IOException
91          * 
92          */
93         @Override
94         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
95                 LOGGER.info("Fetching list of all the topics.");
96                 JSONObject json = new JSONObject();
97
98                 JSONArray topicsList = new JSONArray();
99
100                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
101                         topicsList.put(topic.getName());
102                 }
103
104                 json.put("topics", topicsList);
105
106                 LOGGER.info("Returning list of all the topics.");
107                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
108
109         }
110
111         /**
112          * @param dmaapContext
113          * @throws JSONException
114          * @throws ConfigDbException
115          * @throws IOException
116          * 
117          */
118         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119
120                 LOGGER.info("Fetching list of all the topics.");
121                 JSONObject json = new JSONObject();
122
123                 JSONArray topicsList = new JSONArray();
124
125                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
126                         JSONObject obj = new JSONObject();
127                         obj.put("topicName", topic.getName());
128                         // obj.put("description", topic.getDescription());
129                         obj.put("owner", topic.getOwner());
130                         obj.put("txenabled", topic.isTransactionEnabled());
131                         topicsList.put(obj);
132                 }
133
134                 json.put("topics", topicsList);
135
136                 LOGGER.info("Returning list of all the topics.");
137                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
138
139         }
140
141         /**
142          * @param dmaapContext
143          * @param topicName
144          * @throws ConfigDbException
145          * @throws IOException
146          * @throws TopicExistsException
147          */
148         @Override
149         public void getTopic(DMaaPContext dmaapContext, String topicName)
150                         throws ConfigDbException, IOException, TopicExistsException {
151
152                 LOGGER.info("Fetching details of topic " + topicName);
153                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
154
155                 if (null == t) {
156                         LOGGER.error("Topic [" + topicName + "] does not exist.");
157                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
158                 }
159
160                 JSONObject o = new JSONObject();
161                 o.put("name", t.getName());
162                 o.put("description", t.getDescription());
163
164                 if (null != t.getOwners())
165                         o.put("owner", t.getOwners().iterator().next());
166                 if (null != t.getReaderAcl())
167                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
168                 if (null != t.getWriterAcl())
169                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
170
171                 LOGGER.info("Returning details of topic " + topicName);
172                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
173
174         }
175
176         /**
177          * @param dmaapContext
178          * @param topicBean
179          * @throws CambriaApiException
180          * @throws AccessDeniedException
181          * @throws IOException
182          * @throws TopicExistsException
183          * @throws JSONException
184          * 
185          * 
186          * 
187          */
188         @Override
189         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190                         throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
191
192                 LOGGER.info("Creating topic " + topicBean.getTopicName());
193
194                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
195                 String key = null;
196                 //String appName = dmaapContext.getRequest().getHeader("AppName");
197                 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
198                                 "enforced.topic.name.AAF");
199
200                 if (user != null) {
201                         key = user.getKey();
202
203                         if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
204
205                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
206
207                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
208                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
209                                                 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
210
211                                 LOGGER.info(errRes.toString());
212                                 // throw new DMaaPAccessDeniedException(errRes);
213
214                         }
215                 }
216                 // else if (user==null &&
217                 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
218                 // == dmaapContext.getRequest().getHeader("cookie")) ) {
219                 /*else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
220                                 ) {
221                         LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
222
223                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
224                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
225                                         "Failed to create topic: Access Denied.User does not have permission to perform create topic");
226
227                         LOGGER.info(errRes.toString());
228                         // throw new DMaaPAccessDeniedException(errRes);
229                 }*/
230
231                 if (user == null /*&& (null != dmaapContext.getRequest().getHeader("Authorization")
232                                 )*/) {
233                         // if (user == null &&
234                         // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
235                         // null != dmaapContext.getRequest().getHeader("cookie"))) {
236                         // ACL authentication is not provided so we will use the aaf
237                         // authentication
238                         /*LOGGER.info("Authorization the topic");
239
240                         String permission = "";
241                         String nameSpace = "";
242                         if (topicBean.getTopicName().indexOf(".") > 1)
243                                 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
244
245                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
246                                         "msgRtr.topicfactory.aaf");
247
248                         // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
249
250                         permission = mrFactoryVal + nameSpace + "|create";
251                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();*/
252
253                         //if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
254                         if (false) {
255                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
256
257                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
258                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
259                                                 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
260                                                                 //+ permission);
261                                                 + "permission");
262                                                 
263
264                                 LOGGER.info(errRes.toString());
265                                 throw new DMaaPAccessDeniedException(errRes);
266
267                         } else {
268                                 // if user is null and aaf authentication is ok then key should
269                                 // be ""
270                                 // key = "";
271                                 /**
272                                  * Added as part of AAF user it should return username
273                                  */
274
275                                 //key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
276                                 //key="admin";
277                                 //LOGGER.info("key ==================== " + key);
278
279                         }
280                 }
281
282                 try {
283                         final String topicName = topicBean.getTopicName();
284                         final String desc = topicBean.getTopicDescription();
285                         int partition = topicBean.getPartitionCount();
286                         // int replica = topicBean.getReplicationCount();
287                         if (partition == 0) {
288                                 partition = 8;
289                         }
290                         final int partitions = partition;
291
292                         int replica = topicBean.getReplicationCount();
293                         if (replica == 0) {
294                                 //replica = 3;
295                                 replica = 1;
296                         }
297                         final int replicas = replica;
298                         boolean transactionEnabled = topicBean.isTransactionEnabled();
299
300                         final Broker1 metabroker = getMetaBroker(dmaapContext);
301                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
302
303                         LOGGER.info("Topic created successfully. Sending response");
304                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
305                 } catch (JSONException excp) {
306
307                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
308                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
309                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
310                         LOGGER.info(errRes.toString());
311                         throw new CambriaApiException(errRes);
312
313                 } catch (ConfigDbException excp1) {
314
315                         LOGGER.error("Failed to create topic.  Config DB Exception", excp1);
316                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
317                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
318                         LOGGER.info(errRes.toString());
319                         throw new CambriaApiException(errRes);
320                 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
321                         // TODO Auto-generated catch block
322                         e.printStackTrace();
323                 }
324         }
325
326         /**
327          * @param dmaapContext
328          * @param topicName
329          * @throws ConfigDbException
330          * @throws IOException
331          * @throws TopicExistsException
332          * @throws CambriaApiException
333          * @throws AccessDeniedException
334          */
335         @Override
336         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
337                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
338
339                 LOGGER.info(" Deleting topic " + topicName);
340                 /*if (true) { // {
341                         LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
342                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
343                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
344                                                         + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
345                         LOGGER.info(errRes.toString());
346                         throw new DMaaPAccessDeniedException(errRes);
347                 }*/
348
349                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
350
351                 /*if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
352                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
353                         // String permission =
354                         // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
355                         String permission = "";
356                         String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
357                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
358                                         "msgRtr.topicfactory.aaf");
359                         // String tokens[] = topicName.split(".mr.topic.");
360                         permission = mrFactoryVal + nameSpace + "|destroy";
361                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
362                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
363                                 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
364                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
365                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
366                                                 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
367                                                                 + errorMessages.getNotPermitted2());
368                                 LOGGER.info(errRes.toString());
369                                 throw new DMaaPAccessDeniedException(errRes);
370                         }
371
372                 }*/
373
374                 final Broker1 metabroker = getMetaBroker(dmaapContext);
375                 final Topic topic = metabroker.getTopic(topicName);
376
377                 if (topic == null) {
378                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
379                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
380                 }
381
382                  try {
383                         metabroker.deleteTopic(topicName);
384                 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
385                         // TODO Auto-generated catch block
386                         throw new CambriaApiException(500, "failed to delete the topic");
387                 }
388
389                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
390                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
391
392         }
393
394         /**
395          * 
396          * @param dmaapContext
397          * @return
398          */
399         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
400                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
401         }
402
403         /**
404          * @param dmaapContext
405          * @param topicName
406          * @throws ConfigDbException
407          * @throws IOException
408          * @throws TopicExistsException
409          * 
410          */
411         @Override
412         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
413                         throws ConfigDbException, IOException, TopicExistsException {
414                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
415                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
416
417                 if (topic == null) {
418                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
419                         throw new TopicExistsException(
420                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
421                 }
422
423                 final NsaAcl acl = topic.getWriterAcl();
424
425                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
426                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
427
428         }
429
430         /**
431          * 
432          * @param acl
433          * @return
434          */
435         private static JSONObject aclToJson(NsaAcl acl) {
436                 final JSONObject o = new JSONObject();
437                 if (acl == null) {
438                         o.put("enabled", false);
439                         o.put("users", new JSONArray());
440                 } else {
441                         o.put("enabled", acl.isActive());
442
443                         final JSONArray a = new JSONArray();
444                         for (String user : acl.getUsers()) {
445                                 a.put(user);
446                         }
447                         o.put("users", a);
448                 }
449                 return o;
450         }
451
452         /**
453          * @param dmaapContext
454          * @param topicName
455          */
456         @Override
457         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
458                         throws IOException, ConfigDbException, TopicExistsException {
459                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
460                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
461
462                 if (topic == null) {
463                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
464                         throw new TopicExistsException(
465                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
466                 }
467
468                 final NsaAcl acl = topic.getReaderAcl();
469
470                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
471                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
472
473         }
474
475         /**
476          * 
477          * @param t
478          * @return
479          */
480         private static JSONObject topicToJson(Topic t) {
481                 final JSONObject o = new JSONObject();
482
483                 o.put("name", t.getName());
484                 o.put("description", t.getDescription());
485                 o.put("owner", t.getOwner());
486                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
487                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
488
489                 return o;
490         }
491
492         /**
493          * @param dmaapContext
494          *                      @param topicName @param producerId @throws
495          *            ConfigDbException @throws IOException @throws
496          *            TopicExistsException @throws AccessDeniedException @throws
497          * 
498          */
499         @Override
500         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
501                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
502
503                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
504                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
505
506                 // if (user == null) {
507                 //
508                 // LOGGER.info("Authenticating the user, as ACL authentication is not
509                 // provided");
510                 //// String permission =
511                 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
512                 //
513                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
514                 // String permission = aaf.aafPermissionString(topicName, "manage");
515                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
516                 // {
517                 // LOGGER.error("Failed to permit write access to producer [" +
518                 // producerId + "] for topic " + topicName
519                 // + ". Authentication failed.");
520                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
521                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
522                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
523                 // "+errorMessages.getNotPermitted2()+ topicName);
524                 // LOGGER.info(errRes);
525                 // throw new DMaaPAccessDeniedException(errRes);
526                 // }
527                 // }
528
529                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
530
531                 if (null == topic) {
532                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
533                                         + "] does not exist.");
534                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
535                                         + "] for topic. Topic [" + topicName + "] does not exist.");
536                 }
537
538                 topic.permitWritesFromUser(producerId, user);
539
540                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
541                                 + "]. Sending response.");
542                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
543
544         }
545
546         /**
547          * @param dmaapContext
548          * @param topicName
549          * @param producerId
550          * @throws ConfigDbException
551          * @throws IOException
552          * @throws TopicExistsException
553          * @throws AccessDeniedException
554          * @throws DMaaPAccessDeniedException
555          * 
556          */
557         @Override
558         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
559                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
560                         DMaaPAccessDeniedException {
561
562                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
563                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
564                 // if (user == null) {
565                 //
566                 //// String permission =
567                 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
568                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
569                 // String permission = aaf.aafPermissionString(topicName, "manage");
570                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
571                 // {
572                 // LOGGER.error("Failed to revoke write access to producer [" +
573                 // producerId + "] for topic " + topicName
574                 // + ". Authentication failed.");
575                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
576                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
577                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
578                 // "+errorMessages.getNotPermitted2()+ topicName);
579                 // LOGGER.info(errRes);
580                 // throw new DMaaPAccessDeniedException(errRes);
581                 //
582                 // }
583                 // }
584
585                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
586
587                 if (null == topic) {
588                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
589                                         + "] does not exist.");
590                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
591                                         + "] for topic. Topic [" + topicName + "] does not exist.");
592                 }
593
594                 topic.denyWritesFromUser(producerId, user);
595
596                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
597                                 + "]. Sending response.");
598                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
599
600         }
601
602         /**
603          * @param dmaapContext
604          * @param topicName
605          * @param consumerId
606          * @throws DMaaPAccessDeniedException
607          */
608         @Override
609         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
610                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
611                         DMaaPAccessDeniedException {
612
613                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
614                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
615                 // if (user == null) {
616                 //
617                 //// String permission =
618                 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
619                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
620                 // String permission = aaf.aafPermissionString(topicName, "manage");
621                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
622                 // {
623                 // LOGGER.error("Failed to permit read access to consumer [" +
624                 // consumerId + "] for topic " + topicName
625                 // + ". Authentication failed.");
626                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
627                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
628                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
629                 // "+errorMessages.getNotPermitted2()+ topicName);
630                 // LOGGER.info(errRes);
631                 // throw new DMaaPAccessDeniedException(errRes);
632                 // }
633                 // }
634
635                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
636
637                 if (null == topic) {
638                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
639                                         + "] does not exist.");
640                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
641                                         + "] for topic. Topic [" + topicName + "] does not exist.");
642                 }
643
644                 topic.permitReadsByUser(consumerId, user);
645
646                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
647                                 + "]. Sending response.");
648                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
649                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
650         }
651
652         /**
653          * @param dmaapContext
654          * @param topicName
655          * @param consumerId
656          * @throws DMaaPAccessDeniedException
657          */
658         @Override
659         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
660                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
661                         DMaaPAccessDeniedException {
662
663                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
664                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
665                 // if (user == null) {
666                 //// String permission =
667                 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
668                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
669                 // String permission = aaf.aafPermissionString(topicName, "manage");
670                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
671                 // {
672                 // LOGGER.error("Failed to revoke read access to consumer [" +
673                 // consumerId + "] for topic " + topicName
674                 // + ". Authentication failed.");
675                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
676                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
677                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
678                 // "+errorMessages.getNotPermitted2()+ topicName);
679                 // LOGGER.info(errRes);
680                 // throw new DMaaPAccessDeniedException(errRes);
681                 // }
682                 //
683                 //
684                 // }
685
686                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
687
688                 if (null == topic) {
689                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
690                                         + "] does not exist.");
691                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
692                                         + "] for topic. Topic [" + topicName + "] does not exist.");
693                 }
694
695                 topic.denyReadsByUser(consumerId, user);
696
697                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
698                                 + "]. Sending response.");
699                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
700                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
701
702         }
703
704 }