/******************************************************************************* * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= * * *******************************************************************************/ package org.onap.dmaap.kafkaAuthorize; import java.util.Map; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.onap.aaf.cadi.PropAccess; import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory; import kafka.network.RequestChannel.Session; import kafka.security.auth.Acl; import kafka.security.auth.Authorizer; import kafka.security.auth.Operation; import kafka.security.auth.Resource; import scala.collection.immutable.Set; /** * A trivial Kafka Authorizer for use with SSL and AAF * Authentication/Authorization. * */ public class KafkaCustomAuthorizer implements Authorizer { private PropAccess access; private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class); // I'm assuming this is called BEFORE any usage... @Override public void configure(final Map arg0) { // TODO Auto-generate method stub } @Override public void addAcls(final Set arg0, final Resource arg1) { // TODO Auto-generated method stub } @Override public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) { if (arg0.principal() == null) { return false; } String fullName = arg0.principal().getName(); fullName = fullName != null ? fullName.trim() : fullName; String topicName = null; String namspace = null; String ins = null; String type = null; String action = null; String kafkaactivity = arg1.name(); if (kafkaactivity.equals("Read")) { action = "sub"; } else if (kafkaactivity.equals("Write")) { action = "pub"; } else if (kafkaactivity.equals("Create")) { action = "create"; } else { return true; } if (arg2.resourceType().name().equals("Topic")) { topicName = arg2.name(); } else { return true; } try { if (null != topicName && topicName.indexOf(".") > 0) { if (action.equals("create")) { String instancePart = (System.getenv("msgRtr.topicfactory.aaf") != null) ? System.getenv("msgRtr.topicfactory.aaf") : "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:"; String[] instandType = (instancePart + namspace + "|create").split("|"); ins = instandType[0]; type = instandType[1]; } else if (action.equals("pub") || action.equals("sub")) { namspace = topicName.substring(0, topicName.lastIndexOf(".")); String instancePart = (System.getenv("pubSubInstPart") != null) ? System.getenv("pubSubInstPart") : ".topic"; ins = namspace + instancePart; type = ":topic." + topicName; } logger.info("^Event Received for topic " + topicName + " , User " + fullName + " , action = " + action); } if (null != fullName && fullName.equals("admin")) { return true; } if (null != topicName) { boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider() .hasPermission(fullName, ins, type, action); if (hasResp) { logger.info("Successful Authorization for " + fullName + " on " + topicName + " for " + ins + "|" + type + "|" + action); } if (!hasResp) { logger.info(fullName + " is not allowed in " + ins + "|" + type + "|" + action); return false; } } } catch (final Exception e) { return false; } return true; } @Override public void close() { // TODO Auto-generated method stub } @Override public scala.collection.immutable.Map> getAcls() { // TODO Auto-generated method stub return null; } @Override public scala.collection.immutable.Map> getAcls(final KafkaPrincipal arg0) { // TODO Auto-generated method stub return null; } @Override public boolean removeAcls(final Resource arg0) { // TODO Auto-generated method stub return false; } @Override public boolean removeAcls(final Set arg0, final Resource arg1) { // TODO Auto-generated method stub return false; } public Set getAcls(Resource arg0) { // TODO Auto-generated method stub return null; } }