add configurable default partitions and replicas
[dmaap/messagerouter/msgrtr.git] / src / main / java / org / onap / dmaap / dmf / mr / service / impl / TopicServiceImpl.java
1 /**
2  * 
3  */
4 /*******************************************************************************
5  *  ============LICENSE_START=======================================================
6  *  org.onap.dmaap
7  *  ================================================================================
8  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9  *  ================================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *        http://www.apache.org/licenses/LICENSE-2.0
14 *  
15  *  Unless required by applicable law or agreed to in writing, software
16  *  distributed under the License is distributed on an "AS IS" BASIS,
17  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  *  See the License for the specific language governing permissions and
19  *  limitations under the License.
20  *  ============LICENSE_END=========================================================
21  *  
22  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  *  
24  *******************************************************************************/
25 package org.onap.dmaap.dmf.mr.service.impl;
26
27 import java.io.IOException;
28
29 import org.apache.commons.lang.StringUtils;
30 import org.apache.http.HttpStatus;
31 import org.json.JSONArray;
32 import org.json.JSONException;
33 import org.json.JSONObject;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.stereotype.Service;
36
37 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
38 import org.onap.dmaap.dmf.mr.CambriaApiException;
39 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
40 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
41 import org.onap.dmaap.dmf.mr.beans.TopicBean;
42 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
43 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
46 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
47 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
49
50 import org.onap.dmaap.dmf.mr.metabroker.Topic;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
54 import org.onap.dmaap.dmf.mr.service.TopicService;
55 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
56 import org.onap.dmaap.dmf.mr.utils.Utils;
57 import com.att.eelf.configuration.EELFLogger;
58 import com.att.eelf.configuration.EELFManager;
59 import com.att.nsa.configs.ConfigDbException;
60 import com.att.nsa.security.NsaAcl;
61 import com.att.nsa.security.NsaApiKey;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
63
64 /**
65  * @author muzainulhaque.qazi
66  *
67  */
68 @Service
69 public class TopicServiceImpl implements TopicService {
70
71         // private static final Logger LOGGER =
72         
73         private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
74         @Autowired
75         private DMaaPErrorMessages errorMessages;
76
77         // @Value("${msgRtr.topicfactory.aaf}")
78         
79
80         public DMaaPErrorMessages getErrorMessages() {
81                 return errorMessages;
82         }
83
84         public void setErrorMessages(DMaaPErrorMessages errorMessages) {
85                 this.errorMessages = errorMessages;
86         }
87
88         /**
89          * @param dmaapContext
90          * @throws JSONException
91          * @throws ConfigDbException
92          * @throws IOException
93          * 
94          */
95         @Override
96         public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
97                 LOGGER.info("Fetching list of all the topics.");
98                 JSONObject json = new JSONObject();
99
100                 JSONArray topicsList = new JSONArray();
101
102                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
103                         topicsList.put(topic.getName());
104                 }
105
106                 json.put("topics", topicsList);
107
108                 LOGGER.info("Returning list of all the topics.");
109                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
110
111         }
112
113         /**
114          * @param dmaapContext
115          * @throws JSONException
116          * @throws ConfigDbException
117          * @throws IOException
118          * 
119          */
120         public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
121
122                 LOGGER.info("Fetching list of all the topics.");
123                 JSONObject json = new JSONObject();
124
125                 JSONArray topicsList = new JSONArray();
126
127                 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
128                         JSONObject obj = new JSONObject();
129                         obj.put("topicName", topic.getName());
130                         
131                         obj.put("owner", topic.getOwner());
132                         obj.put("txenabled", topic.isTransactionEnabled());
133                         topicsList.put(obj);
134                 }
135
136                 json.put("topics", topicsList);
137
138                 LOGGER.info("Returning list of all the topics.");
139                 DMaaPResponseBuilder.respondOk(dmaapContext, json);
140
141         }
142
143         /**
144          * @param dmaapContext
145          * @param topicName
146          * @throws ConfigDbException
147          * @throws IOException
148          * @throws TopicExistsException
149          */
150         @Override
151         public void getTopic(DMaaPContext dmaapContext, String topicName)
152                         throws ConfigDbException, IOException, TopicExistsException {
153
154                 LOGGER.info("Fetching details of topic " + topicName);
155                 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
156
157                 if (null == t) {
158                         LOGGER.error("Topic [" + topicName + "] does not exist.");
159                         throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
160                 }
161
162                 JSONObject o = new JSONObject();
163                 o.put("name", t.getName());
164                 o.put("description", t.getDescription());
165
166                 if (null != t.getOwners())
167                         o.put("owner", t.getOwners().iterator().next());
168                 if (null != t.getReaderAcl())
169                         o.put("readerAcl", aclToJson(t.getReaderAcl()));
170                 if (null != t.getWriterAcl())
171                         o.put("writerAcl", aclToJson(t.getWriterAcl()));
172
173                 LOGGER.info("Returning details of topic " + topicName);
174                 DMaaPResponseBuilder.respondOk(dmaapContext, o);
175
176         }
177
178         /**
179          * @param dmaapContext
180          * @param topicBean
181          * @throws CambriaApiException
182          * @throws AccessDeniedException
183          * @throws IOException
184          * @throws TopicExistsException
185          * @throws JSONException
186          * 
187          * 
188          * 
189          */
190         @Override
191         public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
192                         throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
193                 LOGGER.info("Creating topic " + topicBean.getTopicName());
194
195                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
196                 String key = null;
197                 String appName = dmaapContext.getRequest().getHeader("AppName");
198                 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
199                                 "enforced.topic.name.AAF");
200
201                 if (user != null) {
202                         key = user.getKey();
203
204                         if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
205
206                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
207
208                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
209                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
210                                                 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
211
212                                 LOGGER.info(errRes.toString());
213                                 // throw new DMaaPAccessDeniedException(errRes);
214
215                         }
216                 }
217                 // else if (user==null &&
218                 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
219                 // == dmaapContext.getRequest().getHeader("cookie")) ) {
220                 else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization")
221                                 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
222                         LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
223
224                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
225                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
226                                         "Failed to create topic: Access Denied.User does not have permission to perform create topic");
227
228                         LOGGER.info(errRes.toString());
229                         // throw new DMaaPAccessDeniedException(errRes);
230                 }
231
232                 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
233                                 )) {
234                         // if (user == null &&
235                         // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
236                         // null != dmaapContext.getRequest().getHeader("cookie"))) {
237                         // ACL authentication is not provided so we will use the aaf
238                         // authentication
239                         LOGGER.info("Authorization the topic");
240
241                         String permission = "";
242                         String nameSpace = "";
243                         if (topicBean.getTopicName().indexOf(".") > 1)
244                                 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
245
246                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
247                                         "msgRtr.topicfactory.aaf");
248
249                         // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
250
251                         permission = mrFactoryVal + nameSpace + "|create";
252                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
253
254                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
255
256                                 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
257
258                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
259                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
260                                                 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
261                                                                 + permission);
262
263                                 LOGGER.info(errRes.toString());
264                                 throw new DMaaPAccessDeniedException(errRes);
265
266                         } else {
267                                 // if user is null and aaf authentication is ok then key should
268                                 // be ""
269                                 // key = "";
270                                 /**
271                                  * Added as part of AAF user it should return username
272                                  */
273
274                                 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
275                                 LOGGER.info("key ==================== " + key);
276
277                         }
278                 }
279
280                 try {
281                         final String topicName = topicBean.getTopicName();
282                         final String desc = topicBean.getTopicDescription();
283                         int partition = topicBean.getPartitionCount();
284                         // int replica = topicBean.getReplicationCount();
285                         String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
286                                         "default.partitions");
287                         String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
288                                         "default.replicas");
289                         if (partition == 0) {
290                                 if(StringUtils.isNotEmpty(defaultPartitions)){
291                                         partition=Integer.parseInt(defaultPartitions);  
292                                 }
293                                 else{
294                                 partition = 1;
295                                 }
296                         }
297                         final int partitions = partition;
298
299                         int replica = topicBean.getReplicationCount();
300                         if (replica == 0) {
301                                 if(StringUtils.isNotEmpty(defaultReplicas)){
302                                         replica=Integer.parseInt(defaultReplicas);      
303                                 }
304                                 else{
305                                 replica = 1;
306                                 }
307                         }
308                         final int replicas = replica;
309                         boolean transactionEnabled = topicBean.isTransactionEnabled();
310
311                         final Broker1 metabroker = getMetaBroker(dmaapContext);
312                         final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
313
314                         LOGGER.info("Topic created successfully. Sending response");
315                         DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
316                 } catch (JSONException excp) {
317
318                         LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
319                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
320                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
321                         LOGGER.info(errRes.toString());
322                         throw new CambriaApiException(errRes);
323
324                 } catch (ConfigDbException excp1) {
325
326                         LOGGER.error("Failed to create topic.  Config DB Exception", excp1);
327                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
328                                         DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
329                         LOGGER.info(errRes.toString());
330                         throw new CambriaApiException(errRes);
331                 } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) {
332                         // TODO Auto-generated catch block
333                         LOGGER.error( e.getMessage());
334                 }
335         }
336
337         /**
338          * @param dmaapContext
339          * @param topicName
340          * @throws ConfigDbException
341          * @throws IOException
342          * @throws TopicExistsException
343          * @throws CambriaApiException
344          * @throws AccessDeniedException
345          */
346         @Override
347         public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
348                         CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
349
350
351                 LOGGER.info(" Deleting topic " + topicName);
352                 /*if (true) { // {
353                         LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
354                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
355                                         DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
356                                                         + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
357                         LOGGER.info(errRes.toString());
358                         throw new DMaaPAccessDeniedException(errRes);
359                 }*/
360
361                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
362
363                 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
364                         LOGGER.info("Authenticating the user, as ACL authentication is not provided");
365                         // String permission =
366                         
367                         String permission = "";
368                         String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
369                         String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
370                                         "msgRtr.topicfactory.aaf");
371                         
372                         permission = mrFactoryVal + nameSpace + "|destroy";
373                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
374                         if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
375                                 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
376                                 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
377                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
378                                                 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
379                                                                 + errorMessages.getNotPermitted2());
380                                 LOGGER.info(errRes.toString());
381                                 throw new DMaaPAccessDeniedException(errRes);
382                         }
383
384                 }
385
386                 final Broker1 metabroker = getMetaBroker(dmaapContext);
387                 final Topic topic = metabroker.getTopic(topicName);
388
389                 if (topic == null) {
390                         LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
391                         throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
392                 }
393
394                 // metabroker.deleteTopic(topicName);
395
396                 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
397                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
398         }
399
400         /**
401          * 
402          * @param dmaapContext
403          * @return
404          */
405         private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
406                 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
407         }
408
409         /**
410          * @param dmaapContext
411          * @param topicName
412          * @throws ConfigDbException
413          * @throws IOException
414          * @throws TopicExistsException
415          * 
416          */
417         @Override
418         public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
419                         throws ConfigDbException, IOException, TopicExistsException {
420                 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
421                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
422
423                 if (topic == null) {
424                         LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
425                         throw new TopicExistsException(
426                                         "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
427                 }
428
429                 final NsaAcl acl = topic.getWriterAcl();
430
431                 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
432                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
433
434         }
435
436         /**
437          * 
438          * @param acl
439          * @return
440          */
441         private static JSONObject aclToJson(NsaAcl acl) {
442                 final JSONObject o = new JSONObject();
443                 if (acl == null) {
444                         o.put("enabled", false);
445                         o.put("users", new JSONArray());
446                 } else {
447                         o.put("enabled", acl.isActive());
448
449                         final JSONArray a = new JSONArray();
450                         for (String user : acl.getUsers()) {
451                                 a.put(user);
452                         }
453                         o.put("users", a);
454                 }
455                 return o;
456         }
457
458         /**
459          * @param dmaapContext
460          * @param topicName
461          */
462         @Override
463         public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
464                         throws IOException, ConfigDbException, TopicExistsException {
465                 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
466                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
467
468                 if (topic == null) {
469                         LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
470                         throw new TopicExistsException(
471                                         "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
472                 }
473
474                 final NsaAcl acl = topic.getReaderAcl();
475
476                 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
477                 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
478
479         }
480
481         /**
482          * 
483          * @param t
484          * @return
485          */
486         private static JSONObject topicToJson(Topic t) {
487                 final JSONObject o = new JSONObject();
488
489                 o.put("name", t.getName());
490                 o.put("description", t.getDescription());
491                 o.put("owner", t.getOwner());
492                 o.put("readerAcl", aclToJson(t.getReaderAcl()));
493                 o.put("writerAcl", aclToJson(t.getWriterAcl()));
494
495                 return o;
496         }
497
498         /**
499          * @param dmaapContext
500          *                      @param topicName @param producerId @throws
501          *            ConfigDbException @throws IOException @throws
502          *            TopicExistsException @throws AccessDeniedException @throws
503          * 
504          */
505         @Override
506         public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
507                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
508
509                 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
510                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
511
512                 
513                 //
514                 // LOGGER.info("Authenticating the user, as ACL authentication is not
515                 
516                 //// String permission =
517                 
518                 //
519                 
520                 
521                 
522                 // {
523                 // LOGGER.error("Failed to permit write access to producer [" +
524                 // producerId + "] for topic " + topicName
525                 
526                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
527                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
528                 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
529                 
530                 
531                 
532                 // }
533                 // }
534
535                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
536
537                 if (null == topic) {
538                         LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
539                                         + "] does not exist.");
540                         throw new TopicExistsException("Failed to permit write access to producer [" + producerId
541                                         + "] for topic. Topic [" + topicName + "] does not exist.");
542                 }
543
544                 topic.permitWritesFromUser(producerId, user);
545
546                 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
547                                 + "]. Sending response.");
548                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
549
550         }
551
552         /**
553          * @param dmaapContext
554          * @param topicName
555          * @param producerId
556          * @throws ConfigDbException
557          * @throws IOException
558          * @throws TopicExistsException
559          * @throws AccessDeniedException
560          * @throws DMaaPAccessDeniedException
561          * 
562          */
563         @Override
564         public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
565                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
566                         DMaaPAccessDeniedException {
567
568                 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
569                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
570                 
571                 //
572                 //// String permission =
573                 
574                 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
575                 // String permission = aaf.aafPermissionString(topicName, "manage");
576                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
577                 // {
578                 // LOGGER.error("Failed to revoke write access to producer [" +
579                 // producerId + "] for topic " + topicName
580                 
581                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
582                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
583                 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
584                 
585                 
586                 // throw new DMaaPAccessDeniedException(errRes);
587                 //
588         
589                 // }
590
591                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
592
593                 if (null == topic) {
594                         LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
595                                         + "] does not exist.");
596                         throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
597                                         + "] for topic. Topic [" + topicName + "] does not exist.");
598                 }
599
600                 topic.denyWritesFromUser(producerId, user);
601
602                 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
603                                 + "]. Sending response.");
604                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
605
606         }
607
608         /**
609          * @param dmaapContext
610          * @param topicName
611          * @param consumerId
612          * @throws DMaaPAccessDeniedException
613          */
614         @Override
615         public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
616                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
617                         DMaaPAccessDeniedException {
618
619                 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
620                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
621                 
622                 //
623                 //// String permission =
624                 
625                 
626                 // String permission = aaf.aafPermissionString(topicName, "manage");
627                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
628                 // {
629                 // LOGGER.error("Failed to permit read access to consumer [" +
630                 // consumerId + "] for topic " + topicName
631                 
632                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
633                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
634                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
635                 
636                 
637                 
638                 // }
639                 // }
640
641                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
642
643                 if (null == topic) {
644                         LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
645                                         + "] does not exist.");
646                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
647                                         + "] for topic. Topic [" + topicName + "] does not exist.");
648                 }
649
650                 topic.permitReadsByUser(consumerId, user);
651
652                 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
653                                 + "]. Sending response.");
654                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
655                                 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
656         }
657
658         /**
659          * @param dmaapContext
660          * @param topicName
661          * @param consumerId
662          * @throws DMaaPAccessDeniedException
663          */
664         @Override
665         public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
666                         throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
667                         DMaaPAccessDeniedException {
668
669                 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
670                 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
671                 
672                 //// String permission =
673                 
674                 
675                 // String permission = aaf.aafPermissionString(topicName, "manage");
676                 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
677                 // {
678                 // LOGGER.error("Failed to revoke read access to consumer [" +
679                 // consumerId + "] for topic " + topicName
680                 
681                 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
682                 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
683                 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
684                 
685                 
686                 // throw new DMaaPAccessDeniedException(errRes);
687                 // }
688                 //
689                 //
690         
691                 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
692
693                 if (null == topic) {
694                         LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
695                                         + "] does not exist.");
696                         throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
697                                         + "] for topic. Topic [" + topicName + "] does not exist.");
698                 }
699
700                 topic.denyReadsByUser(consumerId, user);
701
702                 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
703                                 + "]. Sending response.");
704                 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
705                                 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");
706
707         }
708
709 }