X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdmf%2Fmr%2Fservice%2Fimpl%2FTopicServiceImpl.java;h=91fca9cba61388709337e1b93b45c971f1596de6;hb=refs%2Fchanges%2F63%2F90063%2F4;hp=626828b4774958af362702454413ff0ff60a6a50;hpb=652255ae853495a48937304cd101f39284ee4987;p=dmaap%2Fmessagerouter%2Fmsgrtr.git diff --git a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java index 626828b..91fca9c 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/service/impl/TopicServiceImpl.java @@ -1,17 +1,16 @@ -/** - * - */ -/******************************************************************************* +/* * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ + * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved. + * ================================================================================= * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,14 +18,17 @@ * limitations under the License. * ============LICENSE_END========================================================= * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ + */ package org.onap.dmaap.dmf.mr.service.impl; +import com.att.ajsc.beans.PropertiesMapBean; import java.io.IOException; +import java.security.Principal; +import javax.servlet.http.HttpServletRequest; +import joptsimple.internal.Strings; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.math.NumberUtils; import org.apache.http.HttpStatus; import org.json.JSONArray; import org.json.JSONException; @@ -68,15 +70,11 @@ import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException; @Service public class TopicServiceImpl implements TopicService { - // private static final Logger LOGGER = - + private static final String TOPIC_CREATE_OP = "create"; private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class); @Autowired private DMaaPErrorMessages errorMessages; - // @Value("${msgRtr.topicfactory.aaf}") - - public DMaaPErrorMessages getErrorMessages() { return errorMessages; } @@ -85,6 +83,30 @@ public class TopicServiceImpl implements TopicService { this.errorMessages = errorMessages; } + + String getPropertyFromAJSCbean(String propertyKey) { + return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } + + String getPropertyFromAJSCmap(String propertyKey) { + return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey); + } + + NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) { + return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + } + + void respondOk(DMaaPContext context, String msg) { + DMaaPResponseBuilder.respondOkWithHtml(context, msg); + } + + void respondOk(DMaaPContext context, JSONObject json) throws IOException { + DMaaPResponseBuilder.respondOk(context, json); + } + + boolean isCadiEnabled() { + return Utils.isCadiEnabled(); + } /** * @param dmaapContext * @throws JSONException @@ -106,7 +128,7 @@ public class TopicServiceImpl implements TopicService { json.put("topics", topicsList); LOGGER.info("Returning list of all the topics."); - DMaaPResponseBuilder.respondOk(dmaapContext, json); + respondOk(dmaapContext, json); } @@ -136,7 +158,7 @@ public class TopicServiceImpl implements TopicService { json.put("topics", topicsList); LOGGER.info("Returning list of all the topics."); - DMaaPResponseBuilder.respondOk(dmaapContext, json); + respondOk(dmaapContext, json); } @@ -171,7 +193,7 @@ public class TopicServiceImpl implements TopicService { o.put("writerAcl", aclToJson(t.getWriterAcl())); LOGGER.info("Returning details of topic " + topicName); - DMaaPResponseBuilder.respondOk(dmaapContext, o); + respondOk(dmaapContext, o); } @@ -188,150 +210,93 @@ public class TopicServiceImpl implements TopicService { * */ @Override - public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) - throws CambriaApiException, DMaaPAccessDeniedException, IOException, TopicExistsException { - LOGGER.info("Creating topic " + topicBean.getTopicName()); + public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException { + String topicName = topicBean.getTopicName(); + LOGGER.info("Creating topic {}",topicName); + String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - String key = null; - String appName = dmaapContext.getRequest().getHeader("AppName"); - String enfTopicName = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, - "enforced.topic.name.AAF"); - - if (user != null) { - key = user.getKey(); - - if (enfTopicName != null && topicBean.getTopicName().indexOf(enfTopicName) >= 0) { + try { + final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions"); + final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas"); - LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); + final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(), + key, partitions, replicas, topicBean.isTransactionEnabled()); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + LOGGER.info("Topic {} created successfully. Sending response", topicName); + respondOk(dmaapContext, topicToJson(t)); + } catch (JSONException ex) { - LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); + LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); + LOGGER.info(errRes.toString()); + throw new CambriaApiException(errRes); - } - } - // else if (user==null && - // (null==dmaapContext.getRequest().getHeader("Authorization") && null - // == dmaapContext.getRequest().getHeader("cookie")) ) { - else if (Utils.isCadiEnabled()&&user == null && null == dmaapContext.getRequest().getHeader("Authorization") - && (null == appName && null == dmaapContext.getRequest().getHeader("cookie"))) { - LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Failed to create topic: Access Denied.User does not have permission to perform create topic"); + } catch (ConfigDbException ex) { + LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex); + ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, + DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); LOGGER.info(errRes.toString()); - // throw new DMaaPAccessDeniedException(errRes); + throw new CambriaApiException(errRes); + } catch (Broker1.TopicExistsException ex) { + LOGGER.error( "Failed to create topic "+ topicName +". Topic already exists.",ex); } + } - if (user == null && (null != dmaapContext.getRequest().getHeader("Authorization") - )) { - // if (user == null && - // (null!=dmaapContext.getRequest().getHeader("Authorization") || - // null != dmaapContext.getRequest().getHeader("cookie"))) { - // ACL authentication is not provided so we will use the aaf - // authentication - LOGGER.info("Authorization the topic"); - - String permission = ""; - String nameSpace = ""; - if (topicBean.getTopicName().indexOf(".") > 1) - nameSpace = topicBean.getTopicName().substring(0, topicBean.getTopicName().lastIndexOf(".")); - - String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "msgRtr.topicfactory.aaf"); - - // AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,kSettings_KafkaZookeeper); - - permission = mrFactoryVal + nameSpace + "|create"; + private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException { + String clientId = Strings.EMPTY; + if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) { + LOGGER.info("Performing AAF authorization for topic {} creation.", topicName); + String permission = buildPermission(topicName, operation); DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); + clientId = getAAFclientId(dmaapContext.getRequest()); if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { - - LOGGER.error("Failed to create topic" + topicBean.getTopicName() + ", Authentication failed."); - - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - "Failed to create topic: Access Denied.User does not have permission to create topic with perm " - + permission); - - LOGGER.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); - - } else { - // if user is null and aaf authentication is ok then key should - // be "" - // key = ""; - /** - * Added as part of AAF user it should return username - */ - - key = dmaapContext.getRequest().getUserPrincipal().getName().toString(); - LOGGER.info("key ==================== " + key); - + LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}", + operation, topicName, clientId, permission); + throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED, + DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), + "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission)); } + } else if(operation.equals(TOPIC_CREATE_OP)){ + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); + clientId = (user != null) ? user.getKey() : Strings.EMPTY; } + return clientId; + } - try { - final String topicName = topicBean.getTopicName(); - final String desc = topicBean.getTopicDescription(); - int partition = topicBean.getPartitionCount(); - // int replica = topicBean.getReplicationCount(); - String defaultPartitions = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "default.partitions"); - String defaultReplicas = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "default.replicas"); - if (partition == 0) { - if(StringUtils.isNotEmpty(defaultPartitions)){ - partition=Integer.parseInt(defaultPartitions); - } - else{ - partition = 1; - } - } - final int partitions = partition; - - int replica = topicBean.getReplicationCount(); - if (replica == 0) { - if(StringUtils.isNotEmpty(defaultReplicas)){ - replica=Integer.parseInt(defaultReplicas); - } - else{ - replica = 1; - } - } - final int replicas = replica; - boolean transactionEnabled = topicBean.isTransactionEnabled(); - - final Broker1 metabroker = getMetaBroker(dmaapContext); - final Topic t = metabroker.createTopic(topicName, desc, key, partitions, replicas, transactionEnabled); + private String getAAFclientId(HttpServletRequest request) { + Principal principal = request.getUserPrincipal(); + if (principal !=null) { + return principal.getName(); + } else { + LOGGER.warn("Performing AAF authorization but user has not been provided in request."); + return null; + } + } - LOGGER.info("Topic created successfully. Sending response"); - DMaaPResponseBuilder.respondOk(dmaapContext, topicToJson(t)); - } catch (JSONException excp) { + private boolean isTopicWithEnforcedAuthorization(String topicName) { + String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF"); + return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace); + } - LOGGER.error("Failed to create topic. Couldn't parse JSON data.", excp); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, - DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); - LOGGER.info(errRes.toString()); - throw new CambriaApiException(errRes); + int getValueOrDefault(int value, String defaultProperty) { + int returnValue = value; + if (returnValue <= 0) { + String defaultValue = getPropertyFromAJSCmap(defaultProperty); + returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1; + returnValue = (returnValue <= 0) ? 1 : returnValue; + } + return returnValue; + } - } catch (ConfigDbException excp1) { + private String buildPermission(String topicName, String operation) { + String nameSpace = (topicName.indexOf('.') > 1) ? + topicName.substring(0, topicName.lastIndexOf('.')) : ""; - LOGGER.error("Failed to create topic. Config DB Exception", excp1); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST, - DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson()); - LOGGER.info(errRes.toString()); - throw new CambriaApiException(errRes); - } catch (org.onap.dmaap.dmf.mr.metabroker.Broker1.TopicExistsException e) { - // TODO Auto-generated catch block - LOGGER.error( e.getMessage()); - } + String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf"); + return mrFactoryValue + nameSpace + "|" + operation; } /** @@ -347,45 +312,10 @@ public class TopicServiceImpl implements TopicService { public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException, CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException { - LOGGER.info(" Deleting topic " + topicName); - /*if (true) { // { - LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed."); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), errorMessages.getCreateTopicFail() + " " - + errorMessages.getNotPermitted1() + " delete " + errorMessages.getNotPermitted2()); - LOGGER.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); - }*/ - - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); - - if (user == null && null != dmaapContext.getRequest().getHeader("Authorization")) { - LOGGER.info("Authenticating the user, as ACL authentication is not provided"); - // String permission = - - String permission = ""; - String nameSpace = topicName.substring(0, topicName.lastIndexOf(".")); - String mrFactoryVal = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, - "msgRtr.topicfactory.aaf"); - - permission = mrFactoryVal + nameSpace + "|destroy"; - DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl(); - if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) { - LOGGER.error("Failed to delete topi" + topicName + ". Authentication failed."); - ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN, - DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(), - errorMessages.getCreateTopicFail() + " " + errorMessages.getNotPermitted1() + " delete " - + errorMessages.getNotPermitted2()); - LOGGER.info(errRes.toString()); - throw new DMaaPAccessDeniedException(errRes); - } - - } - - final Broker1 metabroker = getMetaBroker(dmaapContext); - final Topic topic = metabroker.getTopic(topicName); + authorizeClient(dmaapContext, topicName, "destroy"); + final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName); if (topic == null) { LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist."); throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist."); @@ -394,7 +324,7 @@ public class TopicServiceImpl implements TopicService { // metabroker.deleteTopic(topicName); LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response."); - DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Topic [" + topicName + "] deleted successfully"); + respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully"); } /** @@ -402,7 +332,7 @@ public class TopicServiceImpl implements TopicService { * @param dmaapContext * @return */ - private DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { + DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) { return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker(); } @@ -429,7 +359,7 @@ public class TopicServiceImpl implements TopicService { final NsaAcl acl = topic.getWriterAcl(); LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response."); - DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + respondOk(dmaapContext, aclToJson(acl)); } @@ -474,7 +404,7 @@ public class TopicServiceImpl implements TopicService { final NsaAcl acl = topic.getReaderAcl(); LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response."); - DMaaPResponseBuilder.respondOk(dmaapContext, aclToJson(acl)); + respondOk(dmaapContext, aclToJson(acl)); } @@ -483,7 +413,7 @@ public class TopicServiceImpl implements TopicService { * @param t * @return */ - private static JSONObject topicToJson(Topic t) { + static JSONObject topicToJson(Topic t) { final JSONObject o = new JSONObject(); o.put("name", t.getName()); @@ -507,7 +437,7 @@ public class TopicServiceImpl implements TopicService { throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException { LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); // @@ -545,7 +475,7 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName + "]. Sending response."); - DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been granted to publisher."); + respondOk(dmaapContext, "Write access has been granted to publisher."); } @@ -566,7 +496,7 @@ public class TopicServiceImpl implements TopicService { DMaaPAccessDeniedException { LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); // //// String permission = @@ -601,7 +531,7 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName + "]. Sending response."); - DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, "Write access has been revoked for publisher."); + respondOk(dmaapContext, "Write access has been revoked for publisher."); } @@ -617,7 +547,7 @@ public class TopicServiceImpl implements TopicService { DMaaPAccessDeniedException { LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); // //// String permission = @@ -651,7 +581,7 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName + "]. Sending response."); - DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + respondOk(dmaapContext, "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "]."); } @@ -667,7 +597,7 @@ public class TopicServiceImpl implements TopicService { DMaaPAccessDeniedException { LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName); - final NsaApiKey user = DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext); + final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext); //// String permission = @@ -701,7 +631,7 @@ public class TopicServiceImpl implements TopicService { LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName + "]. Sending response."); - DMaaPResponseBuilder.respondOkWithHtml(dmaapContext, + respondOk(dmaapContext, "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "]."); }