sonar critical for errorhandling
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / nsa / cambria / service / impl / TopicServiceImpl.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 /**
23  * 
24  */
25 package com.att.nsa.cambria.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.http.HttpStatus;
30 import com.att.eelf.configuration.EELFLogger;
31 import com.att.eelf.configuration.EELFManager;
32 import org.json.JSONArray;
33 import org.json.JSONException;
34 import org.json.JSONObject;
35 import org.springframework.beans.factory.annotation.Autowired;
36 import org.springframework.stereotype.Service;
37
38 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
39 import com.att.nsa.cambria.CambriaApiException;
40 import com.att.nsa.cambria.beans.DMaaPContext;
41 import com.att.nsa.cambria.beans.DMaaPKafkaMetaBroker;
42 import com.att.nsa.cambria.beans.TopicBean;
43 import com.att.nsa.cambria.constants.CambriaConstants;
44 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
45 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
46 import com.att.nsa.cambria.exception.DMaaPResponseCode;
47 import com.att.nsa.cambria.exception.ErrorResponse;
48 import com.att.nsa.cambria.metabroker.Broker;
49 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
50 import com.att.nsa.cambria.metabroker.Topic;
51 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
52 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
53 import com.att.nsa.cambria.security.DMaaPAuthenticatorImpl;
54 import com.att.nsa.cambria.service.TopicService;
55 import com.att.nsa.cambria.utils.DMaaPResponseBuilder;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.security.NsaAcl;
58 import com.att.nsa.security.NsaApiKey;
59 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
60
61 /**
62  * @author author
63  *
64  */
65 @Service
66 public class TopicServiceImpl implements TopicService {
67
68         //private static final Logger LOGGER = Logger.getLogger(TopicServiceImpl.class);
69         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
70         @Autowired
71         private DMaaPErrorMessages errorMessages;
72         
73         
74         
75         //@Value("${msgRtr.topicfactory.aaf}")
76         //private String mrFactory;
77         
78         
79         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
80                 this.errorMessages = errorMessages;
81         }
82
83         /**
84          * @param dmaapContext
85          * @throws JSONException
86          * @throws ConfigDbException
87          * @throws IOException
88          * 
89          */
90         @Override
91         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
92
93                 LOGGER.info("Fetching list of all the topics.");
94                 JSONObject json = new JSONObject();
95
96                 JSONArray topicsList = new JSONArray();
97
98                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
99                         topicsList.put(topic.getName());
100                 }
101
102                 json.put("topics", topicsList);
103
104                 LOGGER.info("Returning list of all the topics.");
105                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
106
107         }
108
109         /**
110          * @param dmaapContext
111          * @throws JSONException
112          * @throws ConfigDbException
113          * @throws IOException
114          * 
115          */
116         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
117
118                 LOGGER.info("Fetching list of all the topics.");
119                 JSONObject json = new JSONObject();
120
121                 JSONArray topicsList = new JSONArray();
122
123                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
124                         JSONObject obj = new JSONObject();
125                         obj.put("topicName", topic.getName());
126                         //obj.put("description", topic.getDescription());
127                         obj.put("owner", topic.getOwner());
128                         obj.put("txenabled", topic.isTransactionEnabled());
129                         topicsList.put(obj);
130                 }
131
132                 json.put("topics", topicsList);
133
134                 LOGGER.info("Returning list of all the topics.");
135                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
136
137         }
138
139         
140         /**
141          * @param dmaapContext
142          * @param topicName
143          * @throws ConfigDbException
144          * @throws IOException
145          * @throws TopicExistsException
146          */
147         @Override
148         public void getTopic(DMaaPContext dmaapContext, String topicName)
149                         throws ConfigDbException, IOException, TopicExistsException {
150
151                 LOGGER.info("Fetching details of topic " + topicName);
152                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
153
154                 if (null == t) {
155                         LOGGER.error("Topic [" + topicName + "] does not exist.");
156                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
157                 }
158
159                 JSONObject o = new JSONObject();
160                 o.put ( "name", t.getName () );
161                 o.put ( "description", t.getDescription () );
162                 
163                 if (null!=t.getOwners ())
164                 o.put ( "owner", t.getOwners ().iterator ().next () );
165                 if(null!=t.getReaderAcl ())
166                 o.put ( "readerAcl", aclToJson ( t.getReaderAcl () ) );
167                 if(null!=t.getWriterAcl ())
168                 o.put ( "writerAcl", aclToJson ( t.getWriterAcl () ) );
169         
170                 LOGGER.info("Returning details of topic " + topicName);
171                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
172
173         }
174
175         
176         /**
177          * @param dmaapContext
178          * @param topicBean
179          * @throws CambriaApiException
180          * @throws AccessDeniedException
181          * @throws IOException
182          * @throws TopicExistsException
183          * @throws JSONException
184          * 
185          * 
186          * 
187          */
188         @Override
189         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190                         throws CambriaApiException, DMaaPAccessDeniedException,IOException, TopicExistsException {
191
192                 LOGGER.info("Creating topic " + topicBean.getTopicName());
193                 
194                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
195                 String key = null;
196                 String appName=dmaapContext.getRequest().getHeader("AppName");
197                 String enfTopicName= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"enforced.topic.name.AAF");
198                 if(user != null)
199                 {
200                         key = user.getKey();
201                         
202                         if(  enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >=0 ) {
203                                 
204                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
205                                 
206                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
207                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
208                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
209                                 LOGGER.info(errRes.toString());
210                                 throw new DMaaPAccessDeniedException(errRes);
211                                 
212                         }
213                 }
214                                 
215                 //else if (user==null && (null==dmaapContext.getRequest().getHeader("Authorization") && null == dmaapContext.getRequest().getHeader("cookie")) ) {
216                         /*else if (user == null &&  null==dmaapContext.getRequest().getHeader("Authorization")     && 
217                                          (null == appName  &&  null == dmaapContext.getRequest().getHeader("cookie"))) {
218                         LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
219                         
220                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
221                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
222                                         errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
223                         LOGGER.info(errRes.toString());
224                         throw new DMaaPAccessDeniedException(errRes);
225                 }*/
226                 
227                 if (user == null &&  (null!=dmaapContext.getRequest().getHeader("Authorization") ||
228                                          null != dmaapContext.getRequest().getHeader("cookie"))) {
229                         //if (user == null && (null!=dmaapContext.getRequest().getHeader("Authorization") || null != dmaapContext.getRequest().getHeader("cookie"))) {
230                          // ACL authentication is not provided so we will use the aaf authentication
231                         LOGGER.info("Authorization the topic");
232                 
233                         String permission = "";
234                         String nameSpace="";
235                         if(topicBean.getTopicName().indexOf(".")>1)
236                          nameSpace = topicBean.getTopicName().substring(0,topicBean.getTopicName().lastIndexOf("."));
237                 
238                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
239                 
240                         //AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
241                         
242                         permission = mrFactoryVal+nameSpace+"|create";
243                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
244                         
245                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
246                         {
247                                 
248                                 LOGGER.error("Failed to create topic"+topicBean.getTopicName()+", Authentication failed.");
249                                 
250                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, 
251                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
252                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" create "+errorMessages.getNotPermitted2());
253                                 LOGGER.info(errRes.toString());
254                                 throw new DMaaPAccessDeniedException(errRes);
255                                 
256                         }else{
257                                 // if user is null and aaf authentication is ok then key should be ""
258                                 //key = "";
259                                 /**
260                                  * Added as part of AAF user it should return username
261                                  */
262                                 
263                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
264                                 LOGGER.info("key ==================== "+key);
265                                 
266                         }
267                 }
268
269                 try {
270                         final String topicName = topicBean.getTopicName();
271                         final String desc = topicBean.getTopicDescription();
272
273                         final  int partitions = topicBean.getPartitionCount();
274                 
275                         final int replicas = topicBean.getReplicationCount();
276                         boolean transactionEnabled = topicBean.isTransactionEnabled();
277                         
278
279                         final Broker metabroker = getMetaBroker(dmaapContext);
280                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas,
281                                         transactionEnabled);
282
283                         LOGGER.info("Topic created successfully. Sending response");
284                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
285                 } catch (JSONException excp) {
286                         
287                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
288                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, 
289                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), 
290                                         errorMessages.getIncorrectJson());
291                         LOGGER.info(errRes.toString());
292                         throw new CambriaApiException(errRes);
293                         
294                 }
295         }
296
297         /**
298          * @param dmaapContext
299          * @param topicName
300          * @throws ConfigDbException
301          * @throws IOException
302          * @throws TopicExistsException
303          * @throws CambriaApiException
304          * @throws AccessDeniedException
305          */
306         @Override
307         public void deleteTopic(DMaaPContext dmaapContext, String topicName)
308                         throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
309
310                 LOGGER.info("Deleting topic " + topicName);
311                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
312
313                 if (user == null && null!=dmaapContext.getRequest().getHeader("Authorization")) {
314                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
315 //                      String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
316                         String permission = "";
317                         String nameSpace="";
318                         nameSpace = topicName.substring(0,topicName.lastIndexOf("."));
319                          String mrFactoryVal=AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.topicfactory.aaf");
320 //                      String tokens[] = topicName.split(".mr.topic.");
321                         permission = mrFactoryVal+nameSpace+"|destroy";
322                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
323                         if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
324                         {
325                                 LOGGER.error("Failed to delete topi"+topicName+". Authentication failed.");
326                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
327                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
328                                                 errorMessages.getCreateTopicFail()+" "+errorMessages.getNotPermitted1()+" delete "+errorMessages.getNotPermitted2());
329                                 LOGGER.info(errRes.toString());
330                                 throw new DMaaPAccessDeniedException(errRes);
331                         }
332                         
333                         
334                 }
335
336                 final Broker metabroker = getMetaBroker(dmaapContext);
337                 final Topic topic = metabroker.getTopic(topicName);
338
339                 if (topic == null) {
340                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
341                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
342                 }
343
344                 metabroker.deleteTopic(topicName);
345
346                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
347                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
348
349         }
350
351         /**
352          * 
353          * @param dmaapContext
354          * @return
355          */
356         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
357                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
358         }
359
360         /**
361          * @param dmaapContext
362          * @param topicName
363          * @throws ConfigDbException
364          * @throws IOException
365          * @throws TopicExistsException
366          * 
367          */
368         @Override
369         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
370                         throws ConfigDbException, IOException, TopicExistsException {
371                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
372                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
373
374                 if (topic == null) {
375                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
376                         throw new TopicExistsException(
377                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
378                 }
379                 
380                 
381
382                 final NsaAcl acl = topic.getWriterAcl();
383
384                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
385                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
386
387         }
388
389         /**
390          * 
391          * @param acl
392          * @return
393          */
394         private static JSONObject aclToJson(NsaAcl acl) {
395                 final JSONObject o = new JSONObject();
396                 if (acl == null) {
397                         o.put("enabled", false);
398                         o.put("users", new JSONArray());
399                 } else {
400                         o.put("enabled", acl.isActive());
401
402                         final JSONArray a = new JSONArray();
403                         for (String user : acl.getUsers()) {
404                                 a.put(user);
405                         }
406                         o.put("users", a);
407                 }
408                 return o;
409         }
410
411         /**
412          * @param dmaapContext
413          * @param topicName
414          */
415         @Override
416         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
417                         throws IOException, ConfigDbException, TopicExistsException {
418                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
419                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
420
421                 if (topic == null) {
422                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
423                         throw new TopicExistsException(
424                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
425                 }
426
427                 final NsaAcl acl = topic.getReaderAcl();
428
429                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
430                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
431
432         }
433
434         /**
435          * 
436          * @param t
437          * @return
438          */
439         private static JSONObject topicToJson(Topic t) {
440                 final JSONObject o = new JSONObject();
441
442                 o.put("name", t.getName());
443                 o.put("description", t.getDescription());
444                 o.put("owner", t.getOwner());
445                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
446                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
447
448                 return o;
449         }
450
451         /**
452          * @param dmaapContext
453          * @param topicName
454          * @param producerId
455          * @throws ConfigDbException
456          * @throws IOException
457          * @throws TopicExistsException
458          * @throws AccessDeniedException
459          * @throws  
460          * 
461          */
462         @Override
463         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
464                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,CambriaApiException {
465
466                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
467                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
468                 
469 //              if (user == null) {
470 //                      
471 //                      LOGGER.info("Authenticating the user, as ACL authentication is not provided");
472 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
473 //                      
474 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
475 //                      String permission = aaf.aafPermissionString(topicName, "manage");
476 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
477 //                      {
478 //                              LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic " + topicName
479 //                                                                      + ". Authentication failed.");
480 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
481 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
482 //                                              errorMessages.getNotPermitted1()+" <Grant publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
483 //                              LOGGER.info(errRes);
484 //                              throw new DMaaPAccessDeniedException(errRes);
485 //                      }
486 //              }
487
488                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
489
490                 if (null == topic) {
491                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
492                                         + "] does not exist.");
493                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
494                                         + "] for topic. Topic [" + topicName + "] does not exist.");
495                 }
496
497                 topic.permitWritesFromUser(producerId, user);
498
499                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
500                                 + "]. Sending response.");
501                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
502
503         }
504
505         /**
506          * @param dmaapContext
507          * @param topicName
508          * @param producerId
509          * @throws ConfigDbException
510          * @throws IOException
511          * @throws TopicExistsException
512          * @throws AccessDeniedException
513          * @throws DMaaPAccessDeniedException 
514          * 
515          */
516         @Override
517         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
518                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
519
520                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
521                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
522 //              if (user == null) {
523 //                      
524 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
525 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
526 //                      String permission = aaf.aafPermissionString(topicName, "manage");
527 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
528 //                      {
529 //                              LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic " + topicName
530 //                                              + ". Authentication failed.");
531 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
532 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
533 //                                              errorMessages.getNotPermitted1()+" <Revoke publish permissions> "+errorMessages.getNotPermitted2()+ topicName);
534 //                              LOGGER.info(errRes);
535 //                              throw new DMaaPAccessDeniedException(errRes);
536 //                              
537 //                      }
538 //              }
539
540                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
541
542                 if (null == topic) {
543                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
544                                         + "] does not exist.");
545                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
546                                         + "] for topic. Topic [" + topicName + "] does not exist.");
547                 }
548
549                 topic.denyWritesFromUser(producerId, user);
550
551                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
552                                 + "]. Sending response.");
553                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
554
555         }
556
557         /**
558          * @param dmaapContext
559          * @param topicName
560          * @param consumerId
561          * @throws DMaaPAccessDeniedException 
562          */
563         @Override
564         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
565                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
566
567                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
568                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
569 //              if (user == null) {
570 //                      
571 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
572 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
573 //                      String permission = aaf.aafPermissionString(topicName, "manage");
574 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
575 //                      {
576 //                              LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic " + topicName
577 //                                              + ". Authentication failed.");
578 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
579 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
580 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
581 //                              LOGGER.info(errRes);
582 //                              throw new DMaaPAccessDeniedException(errRes);
583 //                      }
584 //              }
585
586                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
587
588                 if (null == topic) {
589                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
590                                         + "] does not exist.");
591                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
592                                         + "] for topic. Topic [" + topicName + "] does not exist.");
593                 }
594
595                 topic.permitReadsByUser(consumerId, user);
596
597                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
598                                 + "]. Sending response.");
599                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
600                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
601         }
602
603         /**
604          * @param dmaapContext
605          * @param topicName
606          * @param consumerId
607          * @throws DMaaPAccessDeniedException 
608          */
609         @Override
610         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
611                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, DMaaPAccessDeniedException {
612
613                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
614                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
615 //              if (user == null) {
616 ////                    String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
617 //                      DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
618 //                      String permission = aaf.aafPermissionString(topicName, "manage");
619 //                      if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
620 //                      {
621 //                              LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic " + topicName
622 //                                              + ". Authentication failed.");
623 //                              ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, 
624 //                                              DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), 
625 //                                              errorMessages.getNotPermitted1()+" <Grant consume permissions> "+errorMessages.getNotPermitted2()+ topicName);
626 //                              LOGGER.info(errRes);
627 //                              throw new DMaaPAccessDeniedException(errRes);
628 //                      }
629 //                      
630 //                      
631 //              }
632
633                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
634
635                 if (null == topic) {
636                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
637                                         + "] does not exist.");
638                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
639                                         + "] for topic. Topic [" + topicName + "] does not exist.");
640                 }
641
642                 topic.denyReadsByUser(consumerId, user);
643
644                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
645                                 + "]. Sending response.");
646                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
647                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
648
649         }
650
651
652         
653         
654         
655 }