X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fkafkaauthorize%2FKafkaCustomAuthorizer.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fkafkaauthorize%2FKafkaCustomAuthorizer.java;h=0000000000000000000000000000000000000000;hb=05eba8cb421bb948f5f72b8adec7cd34429391f4;hp=09f704a1d7244bb402df9045e352b9c5f8464cdb;hpb=354b0ae502cbf671ca9b045bda350c107396809b;p=dmaap%2Fkafka11aaf.git diff --git a/src/main/java/org/onap/dmaap/kafkaauthorize/KafkaCustomAuthorizer.java b/src/main/java/org/onap/dmaap/kafkaauthorize/KafkaCustomAuthorizer.java deleted file mode 100644 index 09f704a..0000000 --- a/src/main/java/org/onap/dmaap/kafkaauthorize/KafkaCustomAuthorizer.java +++ /dev/null @@ -1,233 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * Modification copyright (C) 2021 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 -* - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * - *******************************************************************************/ -package org.onap.dmaap.kafkaauthorize; - -import java.util.EnumSet; -import java.util.Map; - -import org.apache.kafka.common.acl.AclOperation; -import org.apache.kafka.common.security.auth.KafkaPrincipal; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; -import org.onap.dmaap.commonauth.kafka.base.authorization.Cadi3AAFProvider; - -import kafka.network.RequestChannel.Session; -import kafka.security.auth.Acl; -import kafka.security.auth.Authorizer; -import kafka.security.auth.Operation; -import kafka.security.auth.Resource; -import scala.collection.immutable.Set; - -/** - * A trivial Kafka Authorizer for use with SSL and AAF - * Authentication/Authorization. - * - */ -public class KafkaCustomAuthorizer implements Authorizer { - - private final String[] adminPermission = new String[3]; - protected static final EnumSet TOPIC_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.DESCRIBE_CONFIGS); - protected static final EnumSet TOPIC_READ_WRITE_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.WRITE, - AclOperation.READ, AclOperation.DESCRIBE_CONFIGS); - protected static final EnumSet TOPIC_ADMIN_OPERATIONS = EnumSet.of(AclOperation.ALTER, - AclOperation.ALTER_CONFIGS, AclOperation.CREATE); - static final String TOPIC = "Topic"; - - private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class); - - @Override - public void configure(final Map arg0) { - // TODO Auto-generate method stub - } - - @Override - public void addAcls(final Set arg0, final Resource arg1) { - // TODO Auto-generated method stub - - } - - private String[] getTopicPermission(String topicName, AclOperation aclOperation) { - - String namspace = topicName.substring(0, topicName.lastIndexOf(".")); - String[] permission = new String[3]; - if (TOPIC_READ_WRITE_DESCRIBE_OPERATIONS.contains(aclOperation)) { - permission[0] = namspace + ".topic"; - String instancePart = (System.getenv("pubSubInstPart") != null) ? System.getenv("pubSubInstPart") - : ".topic"; - permission[1] = instancePart + topicName; - - if (aclOperation.equals(AclOperation.WRITE)) { - permission[2] = "pub"; - } else if (aclOperation.equals(AclOperation.READ)) { - permission[2] = "sub"; - - } else if (TOPIC_DESCRIBE_OPERATIONS.contains(aclOperation)) { - permission[2] = "view"; - - } - } else if (aclOperation.equals(AclOperation.DELETE)) { - permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|destroy").split("\\|"); - - } else if (TOPIC_ADMIN_OPERATIONS.contains(aclOperation)) { - permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|create").split("\\|"); - } - - return permission; - } - - private String[] getAdminPermission() { - - if (adminPermission[0] == null) { - adminPermission[0] = System.getProperty("namespace") + ".kafka.access"; - adminPermission[1] = "*"; - adminPermission[2] = "*"; - } - - return adminPermission; - } - - private String[] getPermission(AclOperation aclOperation, String resource, String topicName) { - String[] permission = new String[3]; - switch (aclOperation) { - - case ALTER: - case ALTER_CONFIGS: - case CREATE: - case DELETE: - if (resource.equals(TOPIC)) { - permission = getTopicPermission(topicName, aclOperation); - } else if (resource.equals("Cluster")) { - permission = getAdminPermission(); - } - break; - case DESCRIBE_CONFIGS: - case READ: - case WRITE: - if (resource.equals(TOPIC)) { - permission = getTopicPermission(topicName, aclOperation); - } - break; - case IDEMPOTENT_WRITE: - if (resource.equals("Cluster")) { - permission = getAdminPermission(); - } - break; - default: - break; - - } - return permission; - - } - - @Override - public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) { - if (arg0.principal() == null) { - return false; - } - - String fullName = arg0.principal().getName(); - fullName = fullName != null ? fullName.trim() : fullName; - String topicName = null; - String[] permission; - - String resource = arg2.resourceType().name(); - - if (resource.equals(TOPIC)) { - topicName = arg2.name(); - } - - if (fullName != null && fullName.equals(Cadi3AAFProvider.getKafkaUsername())) { - return true; - } - - if ((!Cadi3AAFProvider.isCadiEnabled())||(null != topicName && !topicName.startsWith("org.onap"))) { - return true; - } - - permission = getPermission(arg1.toJava(), resource, topicName); - - if (permission[0] != null) { - return !checkPermissions(fullName, topicName, permission); - } - return true; - } - - private boolean checkPermissions(String fullName, String topicName, String[] permission) { - try { - - if (null != topicName) { - boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider() - .hasPermission(fullName, permission[0], permission[1], permission[2]); - if (hasResp) { - logger.info("Successful Authorization for {} on {} for {} | {} | {}", fullName, topicName, - permission[0], permission[1], permission[2]); - } - if (!hasResp) { - logger.info("{} is not allowed in {} | {} | {}", fullName, permission[0], permission[1], - permission[2]); - return true; - } - } - } catch (final Exception e) { - return true; - } - return false; - } - - @Override - public void close() { - // TODO Auto-generated method stub - - } - - @Override - public scala.collection.immutable.Map> getAcls() { - // TODO Auto-generated method stub - return null; - } - - @Override - public scala.collection.immutable.Map> getAcls(final KafkaPrincipal arg0) { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean removeAcls(final Resource arg0) { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean removeAcls(final Set arg0, final Resource arg1) { - // TODO Auto-generated method stub - return false; - } - - public Set getAcls(Resource arg0) { - // TODO Auto-generated method stub - return null; - } -}