4 /*******************************************************************************
5 * ============LICENSE_START=======================================================
7 * ================================================================================
8 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
9 * ================================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 *******************************************************************************/
25 package com.att.dmf.mr.service.impl;
27 import java.io.IOException;
29 import org.apache.http.HttpStatus;
30 import org.json.JSONArray;
31 import org.json.JSONException;
32 import org.json.JSONObject;
33 import org.springframework.beans.factory.annotation.Autowired;
34 import org.springframework.stereotype.Service;
36 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
37 import com.att.dmf.mr.CambriaApiException;
38 import com.att.dmf.mr.beans.DMaaPContext;
39 import com.att.dmf.mr.beans.DMaaPKafkaMetaBroker;
40 import com.att.dmf.mr.beans.TopicBean;
41 import com.att.dmf.mr.constants.CambriaConstants;
42 import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
43 import com.att.dmf.mr.exception.DMaaPErrorMessages;
44 import com.att.dmf.mr.exception.DMaaPResponseCode;
45 import com.att.dmf.mr.exception.ErrorResponse;
46 import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
47 import com.att.dmf.mr.metabroker.Broker1;
49 import com.att.dmf.mr.metabroker.Topic;
50 import com.att.dmf.mr.security.DMaaPAAFAuthenticator;
51 import com.att.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
52 import com.att.dmf.mr.security.DMaaPAuthenticatorImpl;
53 import com.att.dmf.mr.service.TopicService;
54 import com.att.dmf.mr.utils.DMaaPResponseBuilder;
55 import com.att.eelf.configuration.EELFLogger;
56 import com.att.eelf.configuration.EELFManager;
57 import com.att.nsa.configs.ConfigDbException;
58 import com.att.nsa.security.NsaAcl;
59 import com.att.nsa.security.NsaApiKey;
60 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
63 * @author muzainulhaque.qazi
67 public class TopicServiceImpl implements TopicService {
69 // private static final Logger LOGGER =
71 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
73 private DMaaPErrorMessages errorMessages;
75 // @Value("${msgRtr.topicfactory.aaf}")
78 public DMaaPErrorMessages getErrorMessages() {
82 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83 this.errorMessages = errorMessages;
88 * @throws JSONException
89 * @throws ConfigDbException
94 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
95 LOGGER.info("Fetching list of all the topics.");
96 JSONObject json = new JSONObject();
98 JSONArray topicsList = new JSONArray();
100 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
101 topicsList.put(topic.getName());
104 json.put("topics", topicsList);
106 LOGGER.info("Returning list of all the topics.");
107 DMaaPResponseBuilder.respondOk(dmaapContext, json);
112 * @param dmaapContext
113 * @throws JSONException
114 * @throws ConfigDbException
115 * @throws IOException
118 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
120 LOGGER.info("Fetching list of all the topics.");
121 JSONObject json = new JSONObject();
123 JSONArray topicsList = new JSONArray();
125 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
126 JSONObject obj = new JSONObject();
127 obj.put("topicName", topic.getName());
129 obj.put("owner", topic.getOwner());
130 obj.put("txenabled", topic.isTransactionEnabled());
134 json.put("topics", topicsList);
136 LOGGER.info("Returning list of all the topics.");
137 DMaaPResponseBuilder.respondOk(dmaapContext, json);
142 * @param dmaapContext
144 * @throws ConfigDbException
145 * @throws IOException
146 * @throws TopicExistsException
149 public void getTopic(DMaaPContext dmaapContext, String topicName)
150 throws ConfigDbException, IOException, TopicExistsException {
152 LOGGER.info("Fetching details of topic " + topicName);
153 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
156 LOGGER.error("Topic [" + topicName + "] does not exist.");
157 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
160 JSONObject o = new JSONObject();
161 o.put("name", t.getName());
162 o.put("description", t.getDescription());
164 if (null != t.getOwners())
165 o.put("owner", t.getOwners().iterator().next());
166 if (null != t.getReaderAcl())
167 o.put("readerAcl", aclToJson(t.getReaderAcl()));
168 if (null != t.getWriterAcl())
169 o.put("writerAcl", aclToJson(t.getWriterAcl()));
171 LOGGER.info("Returning details of topic " + topicName);
172 DMaaPResponseBuilder.respondOk(dmaapContext, o);
177 * @param dmaapContext
179 * @throws CambriaApiException
180 * @throws AccessDeniedException
181 * @throws IOException
182 * @throws TopicExistsException
183 * @throws JSONException
189 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean)
190 throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException {
191 LOGGER.info("Creating topic " + topicBean.getTopicName());
193 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
195 String appName = dmaapContext.getRequest().getHeader("AppName");
196 String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
197 "enforced.topic.name.AAF");
202 if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) {
204 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
206 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
207 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
208 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
210 LOGGER.info(errRes.toString());
211 // throw new DMaaPAccessDeniedException(errRes);
215 // else if (user==null &&
216 // (null==dmaapContext.getRequest().getHeader("Authorization") && null
217 // == dmaapContext.getRequest().getHeader("cookie")) ) {
218 else if (user == null && null == dmaapContext.getRequest().getHeader("Authorization")
219 && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) {
220 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
222 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
223 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
224 "Failed to create topic: Access Denied.User does not have permission to perform create topic");
226 LOGGER.info(errRes.toString());
227 // throw new DMaaPAccessDeniedException(errRes);
230 if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization")
232 // if (user == null &&
233 // (null!=dmaapContext.getRequest().getHeader("Authorization") ||
234 // null != dmaapContext.getRequest().getHeader("cookie"))) {
235 // ACL authentication is not provided so we will use the aaf
237 LOGGER.info("Authorization the topic");
239 String permission = "";
240 String nameSpace = "";
241 if (topicBean.getTopicName().indexOf(".") > 1)
242 nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf("."));
244 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
245 "msgRtr.topicfactory.aaf");
247 // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper);
249 permission = mrFactoryVal + nameSpace + "|create";
250 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
252 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
254 LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed.");
256 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
257 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
258 "Failed to create topic: Access Denied.User does not have permission to create topic with perm "
261 LOGGER.info(errRes.toString());
262 throw new DMaaPAccessDeniedException(errRes);
265 // if user is null and aaf authentication is ok then key should
269 * Added as part of AAF user it should return username
272 key = dmaapContext.getRequest().getUserPrincipal().getName().toString();
273 LOGGER.info("key ==================== " + key);
279 final String topicName = topicBean.getTopicName();
280 final String desc = topicBean.getTopicDescription();
281 int partition = topicBean.getPartitionCount();
282 // int replica = topicBean.getReplicationCount();
283 if (partition == 0) {
286 final int partitions = partition;
288 int replica = topicBean.getReplicationCount();
292 final int replicas = replica;
293 boolean transactionEnabled = topicBean.isTransactionEnabled();
295 final Broker1 metabroker = getMetaBroker(dmaapContext);
296 final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled);
298 LOGGER.info("Topic created successfully. Sending response");
299 DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t));
300 } catch (JSONException excp) {
302 LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp);
303 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
304 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
305 LOGGER.info(errRes.toString());
306 throw new CambriaApiException(errRes);
308 } catch (ConfigDbException excp1) {
310 LOGGER.error("Failed to create topic. Config DB Exception", excp1);
311 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
312 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
313 LOGGER.info(errRes.toString());
314 throw new CambriaApiException(errRes);
315 } catch (com.att.dmf.mr.metabroker.Broker1.TopicExistsException e) {
316 // TODO Auto-generated catch block
317 LOGGER.error( e.getMessage());
322 * @param dmaapContext
324 * @throws ConfigDbException
325 * @throws IOException
326 * @throws TopicExistsException
327 * @throws CambriaApiException
328 * @throws AccessDeniedException
331 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
332 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
335 LOGGER.info(" Deleting topic " + topicName);
337 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
338 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
339 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " "
340 + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2());
341 LOGGER.info(errRes.toString());
342 throw new DMaaPAccessDeniedException(errRes);
345 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
347 if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) {
348 LOGGER.info("Authenticating the user, as ACL authentication is not provided");
349 // String permission =
351 String permission = "";
352 String nameSpace = topicName.substring(0, topicName.lastIndexOf("."));
353 String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
354 "msgRtr.topicfactory.aaf");
356 permission = mrFactoryVal + nameSpace + "|destroy";
357 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
358 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
359 LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed.");
360 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
361 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
362 errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete "
363 + errorMessages.getNotPermitted2());
364 LOGGER.info(errRes.toString());
365 throw new DMaaPAccessDeniedException(errRes);
370 final Broker1 metabroker = getMetaBroker(dmaapContext);
371 final Topic topic = metabroker.getTopic(topicName);
374 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
375 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
378 // metabroker.deleteTopic(topicName);
380 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
381 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully");
386 * @param dmaapContext
389 private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
390 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
394 * @param dmaapContext
396 * @throws ConfigDbException
397 * @throws IOException
398 * @throws TopicExistsException
402 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
403 throws ConfigDbException, IOException, TopicExistsException {
404 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
405 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
408 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
409 throw new TopicExistsException(
410 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
413 final NsaAcl acl = topic.getWriterAcl();
415 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
416 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
425 private static JSONObject aclToJson(NsaAcl acl) {
426 final JSONObject o = new JSONObject();
428 o.put("enabled", false);
429 o.put("users", new JSONArray());
431 o.put("enabled", acl.isActive());
433 final JSONArray a = new JSONArray();
434 for (String user : acl.getUsers()) {
443 * @param dmaapContext
447 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
448 throws IOException, ConfigDbException, TopicExistsException {
449 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
450 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
453 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
454 throw new TopicExistsException(
455 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
458 final NsaAcl acl = topic.getReaderAcl();
460 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
461 DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl));
470 private static JSONObject topicToJson(Topic t) {
471 final JSONObject o = new JSONObject();
473 o.put("name", t.getName());
474 o.put("description", t.getDescription());
475 o.put("owner", t.getOwner());
476 o.put("readerAcl", aclToJson(t.getReaderAcl()));
477 o.put("writerAcl", aclToJson(t.getWriterAcl()));
483 * @param dmaapContext
484 * @param topicName @param producerId @throws
485 * ConfigDbException @throws IOException @throws
486 * TopicExistsException @throws AccessDeniedException @throws
490 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
491 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
493 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
494 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
498 // LOGGER.info("Authenticating the user, as ACL authentication is not
500 //// String permission =
507 // LOGGER.error("Failed to permit write access to producer [" +
508 // producerId + "] for topic " + topicName
510 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
511 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
512 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
519 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
522 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
523 + "] does not exist.");
524 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
525 + "] for topic. Topic [" + topicName + "] does not exist.");
528 topic.permitWritesFromUser(producerId, user);
530 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
531 + "]. Sending response.");
532 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher.");
537 * @param dmaapContext
540 * @throws ConfigDbException
541 * @throws IOException
542 * @throws TopicExistsException
543 * @throws AccessDeniedException
544 * @throws DMaaPAccessDeniedException
548 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
549 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
550 DMaaPAccessDeniedException {
552 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
553 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
556 //// String permission =
558 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
559 // String permission = aaf.aafPermissionString(topicName, "manage");
560 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
562 // LOGGER.error("Failed to revoke write access to producer [" +
563 // producerId + "] for topic " + topicName
565 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
566 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
567 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
570 // throw new DMaaPAccessDeniedException(errRes);
575 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
578 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
579 + "] does not exist.");
580 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
581 + "] for topic. Topic [" + topicName + "] does not exist.");
584 topic.denyWritesFromUser(producerId, user);
586 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
587 + "]. Sending response.");
588 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher.");
593 * @param dmaapContext
596 * @throws DMaaPAccessDeniedException
599 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
600 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
601 DMaaPAccessDeniedException {
603 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
604 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
607 //// String permission =
610 // String permission = aaf.aafPermissionString(topicName, "manage");
611 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
613 // LOGGER.error("Failed to permit read access to consumer [" +
614 // consumerId + "] for topic " + topicName
616 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
617 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
618 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
625 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
628 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
629 + "] does not exist.");
630 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
631 + "] for topic. Topic [" + topicName + "] does not exist.");
634 topic.permitReadsByUser(consumerId, user);
636 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
637 + "]. Sending response.");
638 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
639 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
643 * @param dmaapContext
646 * @throws DMaaPAccessDeniedException
649 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
650 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
651 DMaaPAccessDeniedException {
653 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
654 final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
656 //// String permission =
659 // String permission = aaf.aafPermissionString(topicName, "manage");
660 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
662 // LOGGER.error("Failed to revoke read access to consumer [" +
663 // consumerId + "] for topic " + topicName
665 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
666 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
667 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
670 // throw new DMaaPAccessDeniedException(errRes);
675 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
678 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
679 + "] does not exist.");
680 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
681 + "] for topic. Topic [" + topicName + "] does not exist.");
684 topic.denyReadsByUser(consumerId, user);
686 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
687 + "]. Sending response.");
688 DMaaPResponseBuilder.respondOkWithHtml(dmaapContext,
689 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");