2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8 * =================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.beans.PropertiesMapBean;
25 import java.io.IOException;
27 import java.security.Principal;
28 import javax.servlet.http.HttpServletRequest;
29 import joptsimple.internal.Strings;
30 import org.apache.commons.lang.StringUtils;
31 import org.apache.commons.lang.math.NumberUtils;
32 import org.apache.http.HttpStatus;
33 import org.json.JSONArray;
34 import org.json.JSONException;
35 import org.json.JSONObject;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Service;
39 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
40 import org.onap.dmaap.dmf.mr.CambriaApiException;
41 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
43 import org.onap.dmaap.dmf.mr.beans.TopicBean;
44 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
48 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
49 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
52 import org.onap.dmaap.dmf.mr.metabroker.Topic;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
54 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
55 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
56 import org.onap.dmaap.dmf.mr.service.TopicService;
57 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
58 import org.onap.dmaap.dmf.mr.utils.Utils;
59 import com.att.eelf.configuration.EELFLogger;
60 import com.att.eelf.configuration.EELFManager;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.NsaAcl;
63 import com.att.nsa.security.NsaApiKey;
64 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
67 * @author muzainulhaque.qazi
71 public class TopicServiceImpl implements TopicService {
73 private static final String TOPIC_CREATE_OP = "create";
74 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
76 private DMaaPErrorMessages errorMessages;
78 public DMaaPErrorMessages getErrorMessages() {
82 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83 this.errorMessages = errorMessages;
87 String getPropertyFromAJSCbean(String propertyKey) {
88 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
91 String getPropertyFromAJSCmap(String propertyKey) {
92 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
95 NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
96 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
99 void respondOk(DMaaPContext context, String msg) {
100 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
103 void respondOk(DMaaPContext context, JSONObject json) throws IOException {
104 DMaaPResponseBuilder.respondOk(context, json);
107 boolean isCadiEnabled() {
108 return Utils.isCadiEnabled();
111 * @param dmaapContext
112 * @throws JSONException
113 * @throws ConfigDbException
114 * @throws IOException
118 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119 LOGGER.info("Fetching list of all the topics.");
120 JSONObject json = new JSONObject();
122 JSONArray topicsList = new JSONArray();
124 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
125 topicsList.put(topic.getName());
128 json.put("topics", topicsList);
130 LOGGER.info("Returning list of all the topics.");
131 respondOk(dmaapContext, json);
136 * @param dmaapContext
137 * @throws JSONException
138 * @throws ConfigDbException
139 * @throws IOException
142 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
144 LOGGER.info("Fetching list of all the topics.");
145 JSONObject json = new JSONObject();
147 JSONArray topicsList = new JSONArray();
149 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
150 JSONObject obj = new JSONObject();
151 obj.put("topicName", topic.getName());
153 obj.put("owner", topic.getOwner());
154 obj.put("txenabled", topic.isTransactionEnabled());
158 json.put("topics", topicsList);
160 LOGGER.info("Returning list of all the topics.");
161 respondOk(dmaapContext, json);
166 * @param dmaapContext
168 * @throws ConfigDbException
169 * @throws IOException
170 * @throws TopicExistsException
173 public void getTopic(DMaaPContext dmaapContext, String topicName)
174 throws ConfigDbException, IOException, TopicExistsException {
176 LOGGER.info("Fetching details of topic " + topicName);
177 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
180 LOGGER.error("Topic [" + topicName + "] does not exist.");
181 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
184 JSONObject o = new JSONObject();
185 o.put("name", t.getName());
186 o.put("description", t.getDescription());
188 if (null != t.getOwners())
189 o.put("owner", t.getOwners().iterator().next());
190 if (null != t.getReaderAcl())
191 o.put("readerAcl", aclToJson(t.getReaderAcl()));
192 if (null != t.getWriterAcl())
193 o.put("writerAcl", aclToJson(t.getWriterAcl()));
195 LOGGER.info("Returning details of topic " + topicName);
196 respondOk(dmaapContext, o);
201 * @param dmaapContext
203 * @throws CambriaApiException
204 * @throws AccessDeniedException
205 * @throws IOException
206 * @throws TopicExistsException
207 * @throws JSONException
213 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
214 String topicName = topicBean.getTopicName();
215 LOGGER.info("Creating topic {}",topicName);
216 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
219 final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
220 final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
222 final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
223 key, partitions, replicas, topicBean.isTransactionEnabled());
225 LOGGER.info("Topic {} created successfully. Sending response", topicName);
226 respondOk(dmaapContext, topicToJson(t));
227 } catch (JSONException ex) {
229 LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
230 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
231 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
232 LOGGER.info(errRes.toString());
233 throw new CambriaApiException(errRes);
235 } catch (ConfigDbException ex) {
237 LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex);
238 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
239 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
240 LOGGER.info(errRes.toString());
241 throw new CambriaApiException(errRes);
242 } catch (Broker1.TopicExistsException ex) {
243 LOGGER.error( "Failed to create topic "+ topicName +". Topic already exists.",ex);
247 private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
248 String clientId = Strings.EMPTY;
249 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
250 LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
251 String permission = buildPermission(topicName, operation);
252 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
253 clientId = getAAFclientId(dmaapContext.getRequest());
255 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
256 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
257 operation, topicName, clientId, permission);
258 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
259 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
260 "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
262 } else if(operation.equals(TOPIC_CREATE_OP)){
263 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
264 clientId = (user != null) ? user.getKey() : Strings.EMPTY;
269 private String getAAFclientId(HttpServletRequest request) {
270 Principal principal = request.getUserPrincipal();
271 if (principal !=null) {
272 return principal.getName();
274 LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
279 private boolean isTopicWithEnforcedAuthorization(String topicName) {
280 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
281 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
284 int getValueOrDefault(int value, String defaultProperty) {
285 int returnValue = value;
286 if (returnValue <= 0) {
287 String defaultValue = getPropertyFromAJSCmap(defaultProperty);
288 returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
289 returnValue = (returnValue <= 0) ? 1 : returnValue;
294 private String buildPermission(String topicName, String operation) {
295 String nameSpace = (topicName.indexOf('.') > 1) ?
296 topicName.substring(0, topicName.lastIndexOf('.')) : "";
298 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
299 return mrFactoryValue + nameSpace + "|" + operation;
303 * @param dmaapContext
305 * @throws ConfigDbException
306 * @throws IOException
307 * @throws TopicExistsException
308 * @throws CambriaApiException
309 * @throws AccessDeniedException
312 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
313 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
315 LOGGER.info(" Deleting topic " + topicName);
316 authorizeClient(dmaapContext, topicName, "destroy");
318 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
320 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
321 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
324 // metabroker.deleteTopic(topicName);
326 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
327 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
332 * @param dmaapContext
335 DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
336 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
340 * @param dmaapContext
342 * @throws ConfigDbException
343 * @throws IOException
344 * @throws TopicExistsException
348 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
349 throws ConfigDbException, IOException, TopicExistsException {
350 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
351 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
354 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
355 throw new TopicExistsException(
356 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
359 final NsaAcl acl = topic.getWriterAcl();
361 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
362 respondOk(dmaapContext, aclToJson(acl));
371 private static JSONObject aclToJson(NsaAcl acl) {
372 final JSONObject o = new JSONObject();
374 o.put("enabled", false);
375 o.put("users", new JSONArray());
377 o.put("enabled", acl.isActive());
379 final JSONArray a = new JSONArray();
380 for (String user : acl.getUsers()) {
389 * @param dmaapContext
393 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
394 throws IOException, ConfigDbException, TopicExistsException {
395 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
396 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
399 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
400 throw new TopicExistsException(
401 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
404 final NsaAcl acl = topic.getReaderAcl();
406 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
407 respondOk(dmaapContext, aclToJson(acl));
416 static JSONObject topicToJson(Topic t) {
417 final JSONObject o = new JSONObject();
419 o.put("name", t.getName());
420 o.put("description", t.getDescription());
421 o.put("owner", t.getOwner());
422 o.put("readerAcl", aclToJson(t.getReaderAcl()));
423 o.put("writerAcl", aclToJson(t.getWriterAcl()));
429 * @param dmaapContext
430 * @param topicName @param producerId @throws
431 * ConfigDbException @throws IOException @throws
432 * TopicExistsException @throws AccessDeniedException @throws
436 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
437 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
439 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
440 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
444 // LOGGER.info("Authenticating the user, as ACL authentication is not
446 //// String permission =
453 // LOGGER.error("Failed to permit write access to producer [" +
454 // producerId + "] for topic " + topicName
456 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
457 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
458 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
465 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
468 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
469 + "] does not exist.");
470 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
471 + "] for topic. Topic [" + topicName + "] does not exist.");
474 topic.permitWritesFromUser(producerId, user);
476 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
477 + "]. Sending response.");
478 respondOk(dmaapContext, "Write access has been granted to publisher.");
483 * @param dmaapContext
486 * @throws ConfigDbException
487 * @throws IOException
488 * @throws TopicExistsException
489 * @throws AccessDeniedException
490 * @throws DMaaPAccessDeniedException
494 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
495 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
496 DMaaPAccessDeniedException {
498 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
499 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
502 //// String permission =
504 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
505 // String permission = aaf.aafPermissionString(topicName, "manage");
506 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
508 // LOGGER.error("Failed to revoke write access to producer [" +
509 // producerId + "] for topic " + topicName
511 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
512 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
513 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
516 // throw new DMaaPAccessDeniedException(errRes);
521 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
524 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
525 + "] does not exist.");
526 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
527 + "] for topic. Topic [" + topicName + "] does not exist.");
530 topic.denyWritesFromUser(producerId, user);
532 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
533 + "]. Sending response.");
534 respondOk(dmaapContext, "Write access has been revoked for publisher.");
539 * @param dmaapContext
542 * @throws DMaaPAccessDeniedException
545 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
546 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
547 DMaaPAccessDeniedException {
549 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
550 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
553 //// String permission =
556 // String permission = aaf.aafPermissionString(topicName, "manage");
557 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
559 // LOGGER.error("Failed to permit read access to consumer [" +
560 // consumerId + "] for topic " + topicName
562 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
563 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
564 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
571 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
574 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
575 + "] does not exist.");
576 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
577 + "] for topic. Topic [" + topicName + "] does not exist.");
580 topic.permitReadsByUser(consumerId, user);
582 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
583 + "]. Sending response.");
584 respondOk(dmaapContext,
585 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
589 * @param dmaapContext
592 * @throws DMaaPAccessDeniedException
595 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
596 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
597 DMaaPAccessDeniedException {
599 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
600 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
602 //// String permission =
605 // String permission = aaf.aafPermissionString(topicName, "manage");
606 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
608 // LOGGER.error("Failed to revoke read access to consumer [" +
609 // consumerId + "] for topic " + topicName
611 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
612 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
613 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
616 // throw new DMaaPAccessDeniedException(errRes);
621 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
624 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
625 + "] does not exist.");
626 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
627 + "] for topic. Topic [" + topicName + "] does not exist.");
630 topic.denyReadsByUser(consumerId, user);
632 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
633 + "]. Sending response.");
634 respondOk(dmaapContext,
635 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");