X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2FkafkaAuthorize%2FKafkaCustomAuthorizer.java;fp=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2FkafkaAuthorize%2FKafkaCustomAuthorizer.java;h=cb33e29d475383ad0899ab0ae6d8391cb2efb127;hb=b01a7330883cbd5bce618ea44ea0f86ce6332729;hp=0000000000000000000000000000000000000000;hpb=ba5557be62af8b3c4f91268a169447adeca85dbc;p=dmaap%2Fkafka11aaf.git diff --git a/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java new file mode 100644 index 0000000..cb33e29 --- /dev/null +++ b/src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java @@ -0,0 +1,153 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 +* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * + *******************************************************************************/ +package org.onap.dmaap.kafkaAuthorize; + +import java.util.Map; + +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.onap.aaf.cadi.PropAccess; +import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; +import kafka.network.RequestChannel.Session; +import kafka.security.auth.Acl; +import kafka.security.auth.Authorizer; +import kafka.security.auth.Operation; +import kafka.security.auth.Resource; +import scala.collection.immutable.Set; + +/** + * A trivial Kafka Authorizer for use with SSL and AAF + * Authentication/Authorization. + * + */ +public class KafkaCustomAuthorizer implements Authorizer { + private PropAccess access; + private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class); + + // I'm assuming this is called BEFORE any usage... + @Override + public void configure(final Map arg0) { + // TODO Auto-generate method stub + } + + @Override + public void addAcls(final Set arg0, final Resource arg1) { + // TODO Auto-generated method stub + + } + + @Override + public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) { + if (arg0.principal() == null) { + return false; + } + + String fullName = arg0.principal().getName(); + fullName = fullName != null ? fullName.trim() : fullName; + String topicName = null; + String namspace = null; + String ins = null; + String type = null; + String action = null; + + String kafkaactivity = arg1.name(); + + if (kafkaactivity.equals("Read")) { + action = "sub"; + } else if (kafkaactivity.equals("Write")) { + action = "pub"; + } else if (kafkaactivity.equals("Describe")) { + return true; + } + if (arg2.resourceType().name().equals("Topic")) { + topicName = arg2.name(); + } else { + return true; + } + + try { + + if (null != topicName && topicName.indexOf(".") > 0) { + namspace = topicName.substring(0, topicName.lastIndexOf(".")); + ins = namspace + ".topic"; + type = ":topic." + topicName; + logger.info("^Event Received for topic " + topicName + " , User " + fullName + " , action = " + action); + } + + if (fullName.equals("admin")) { + return true; + } + + if (null != topicName) { + boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider() + .hasPermission(fullName, ins, type, action); + if (hasResp) { + logger.info("Successful Authorization for " + fullName + " on " + topicName + " for " + ins + "|" + + type + "|" + action); + } + if (!hasResp) { + logger.info(fullName + " is not allowed in " + ins + "|" + type + "|" + action); + throw new Exception(fullName + " is not allowed in " + ins + "|" + type + "|" + action); + } + } + } catch (final Exception e) { + return false; + } + return true; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public scala.collection.immutable.Map> getAcls() { + // TODO Auto-generated method stub + return null; + } + + @Override + public scala.collection.immutable.Map> getAcls(final KafkaPrincipal arg0) { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean removeAcls(final Resource arg0) { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean removeAcls(final Set arg0, final Resource arg1) { + // TODO Auto-generated method stub + return false; + } + + public Set getAcls(Resource arg0) { + // TODO Auto-generated method stub + return null; + } +}