Merge "authorization check for more Kafka operations"
authorMandar Sawant <ms5838@att.com>
Wed, 20 Nov 2019 21:20:20 +0000 (21:20 +0000)
committerGerrit Code Review <gerrit@onap.org>
Wed, 20 Nov 2019 21:20:20 +0000 (21:20 +0000)
pom.xml
src/main/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProvider.java
src/main/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizer.java
src/main/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1.java
src/test/java/org/onap/dmaap/commonauth/kafka/base/authorization/AuthorizationProviderFactoryTest.java
src/test/java/org/onap/dmaap/commonauth/kafka/base/authorization/Cadi3AAFProviderTest.java
src/test/java/org/onap/dmaap/kafkaAuthorize/KafkaCustomAuthorizerTest.java
src/test/java/org/onap/dmaap/kafkaAuthorize/PlainSaslServer1Test.java

diff --git a/pom.xml b/pom.xml
index 067fbbc..d9254bf 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
        <parent>
                <groupId>org.onap.oparent</groupId>
                <artifactId>oparent</artifactId>
-               <version>2.0.0</version>
+               <version>2.1.0</version>
        </parent>
 
        <groupId>org.onap.dmaap.kafka</groupId>
                </dependency>
                <dependency>
                        <groupId>org.apache.kafka</groupId>
-                       <artifactId>kafka_2.12</artifactId>
-                       <version>1.1.1</version>
+                       <artifactId>kafka_2.11</artifactId>
+                       <version>2.3.0</version>
+                       <scope>provided</scope>
                </dependency>
+
                <dependency>
                        <groupId>org.powermock</groupId>
                        <artifactId>powermock-api-mockito</artifactId>
index 9cc45fe..da01829 100644 (file)
@@ -22,8 +22,12 @@ package org.onap.dmaap.commonauth.kafka.base.authorization;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Properties;
 
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +46,45 @@ public class Cadi3AAFProvider implements AuthorizationProvider {
        private static AAFCon<?> aafcon;
        private static final String CADI_PROPERTIES = "/opt/kafka/config/cadi.properties";
        private static final String AAF_LOCATOR_ENV = "aaf_locate_url";
-       private static final String MR_NAMESPACE = "org.onap.dmaap.mr";
+       private static String apiKey = null;
+       private static String kafkaUsername = null;
+       private static AAFAuthn<?> aafAuthn;
+       private static AbsAAFLur<AAFPermission> aafLur;
+
+       private static final Logger logger = LoggerFactory.getLogger(Cadi3AAFProvider.class);
+
+       static {
+
+               Configuration config = Configuration.getConfiguration();
+               try {
+                       if (config == null) {
+                               logger.error("CRITICAL ERROR|Check java.security.auth.login.config VM argument|");
+                       } else {
+                               // read the section for KafkaServer
+                               AppConfigurationEntry[] entries = config.getAppConfigurationEntry("KafkaServer");
+                               if (entries == null) {
+                                       logger.error(
+                                                       "CRITICAL ERROR|Check config contents passed in java.security.auth.login.config VM argument|");
+                                       kafkaUsername = "kafkaUsername";
+                                       apiKey = "apiKey";
+
+                               } else {
+                                       for (int i = 0; i < entries.length; i++) {
+                                               AppConfigurationEntry entry = entries[i];
+                                               Map<String, ?> optionsMap = entry.getOptions();
+                                               kafkaUsername = (String) optionsMap.get("username");
+                                               apiKey = (String) optionsMap.get("password");
+                                       }
+                               }
+                       }
+               } catch (Exception e) {
+                       logger.error("CRITICAL ERROR: JAAS configuration incorrectly set: " + e.getMessage());
+               }
+       }
+
+       public static String getKafkaUsername() {
+               return kafkaUsername;
+       }
 
        public static AAFAuthn<?> getAafAuthn() throws CadiException {
                if (aafAuthn == null) {
@@ -51,11 +93,6 @@ public class Cadi3AAFProvider implements AuthorizationProvider {
                return aafAuthn;
        }
 
-       private static AAFAuthn<?> aafAuthn;
-       private static AbsAAFLur<AAFPermission> aafLur;
-
-       private static final Logger logger = LoggerFactory.getLogger(Cadi3AAFProvider.class);
-
        public Cadi3AAFProvider() {
                setup();
        }
@@ -133,11 +170,20 @@ public class Cadi3AAFProvider implements AuthorizationProvider {
        }
 
        public String authenticate(String userId, String password) throws Exception {
+
                logger.info("^Event received  with   username " + userId);
-               if (userId.equals("admin")) {
-                       logger.info("User Admin by passess AAF call ....");
-                       return null;
+               if (userId.equals(kafkaUsername)) {
+                       if (password.equals(apiKey)) {
+                               logger.info("by passes the authentication for the admin " + kafkaUsername);
+                               return null;
+                       } else {
+                               String errorMessage = "Authentication failed for user " + kafkaUsername;
+                               logger.error(errorMessage);
+                               return errorMessage;
+                       }
+
                }
+
                String aafResponse = aafAuthn.validate(userId, password);
                logger.info("aafResponse=" + aafResponse + " for " + userId);
 
index b78967a..7d38cd2 100644 (file)
  *******************************************************************************/
 package org.onap.dmaap.kafkaAuthorize;
 
+import java.util.EnumSet;
 import java.util.Map;
 
+import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.onap.aaf.cadi.PropAccess;
 import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory;
+import org.onap.dmaap.commonauth.kafka.base.authorization.Cadi3AAFProvider;
+
 import kafka.network.RequestChannel.Session;
 import kafka.security.auth.Acl;
 import kafka.security.auth.Authorizer;
@@ -41,10 +44,16 @@ import scala.collection.immutable.Set;
  * 
  */
 public class KafkaCustomAuthorizer implements Authorizer {
-       private PropAccess access;
+
+       private String[] adminPermission = new String[3];
+       public static final EnumSet<AclOperation> TOPIC_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.DESCRIBE_CONFIGS);
+       public static final EnumSet<AclOperation> TOPIC_READ_WRITE_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.WRITE,
+                       AclOperation.READ, AclOperation.DESCRIBE_CONFIGS);
+       public static final EnumSet<AclOperation> TOPIC_ADMIN_OPERATIONS = EnumSet.of(AclOperation.ALTER,
+                       AclOperation.ALTER_CONFIGS, AclOperation.CREATE);
+
        private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class);
 
-       // I'm assuming this is called BEFORE any usage...
        @Override
        public void configure(final Map<String, ?> arg0) {
                // TODO Auto-generate method stub
@@ -56,6 +65,81 @@ public class KafkaCustomAuthorizer implements Authorizer {
 
        }
 
+       private String[] getTopicPermission(String topicName, AclOperation aclOperation) {
+
+               String namspace = topicName.substring(0, topicName.lastIndexOf("."));
+               String[] permission = new String[3];
+               if (TOPIC_READ_WRITE_DESCRIBE_OPERATIONS.contains(aclOperation)) {
+                       permission[0] = namspace + ".topic";
+                       String instancePart = (System.getenv("pubSubInstPart") != null) ? System.getenv("pubSubInstPart")
+                                       : ".topic";
+                       permission[1] = instancePart + topicName;
+
+                       if (aclOperation.equals(AclOperation.WRITE)) {
+                               permission[2] = "pub";
+                       } else if (aclOperation.equals(AclOperation.READ)) {
+                               permission[2] = "sub";
+
+                       } else if (TOPIC_DESCRIBE_OPERATIONS.contains(aclOperation)) {
+                               permission[2] = "view";
+
+                       }
+               } else if (aclOperation.equals(AclOperation.DELETE)) {
+                       permission = new String(System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|destroy").split("\\|");
+
+               } else if (TOPIC_ADMIN_OPERATIONS.contains(aclOperation)) {
+                       permission = new String(System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|create").split("\\|");
+               }
+
+               return permission;
+       }
+
+       private String[] getAdminPermission() {
+
+               if (adminPermission[0] == null) {
+                       adminPermission[0] = System.getProperty("namespace") + ".kafka.access";
+                       adminPermission[1] = "*";
+                       adminPermission[2] = "*";
+               }
+
+               return adminPermission;
+       }
+
+       private String[] getPermission(AclOperation aclOperation, String resource, String topicName) {
+               String[] permission = new String[3];
+               switch (aclOperation) {
+
+               case ALTER:
+               case ALTER_CONFIGS:
+               case CREATE:
+               case DELETE:
+                       if (resource.equals("Topic")) {
+                               permission = getTopicPermission(topicName, aclOperation);
+                       } else if (resource.equals("Cluster")) {
+                               permission = getAdminPermission();
+                       }
+                       break;
+               case DESCRIBE_CONFIGS:
+               case READ:
+               case WRITE:
+                       if (resource.equals("Topic")) {
+                               permission = getTopicPermission(topicName, aclOperation);
+                       }
+                       break;
+               case IDEMPOTENT_WRITE:
+                       if (resource.equals("Cluster")) {
+                               permission = getAdminPermission();
+                       }
+                       break;
+               default:
+                       break;
+
+               }
+
+               return permission;
+
+       }
+
        @Override
        public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) {
                if (arg0.principal() == null) {
@@ -65,70 +149,49 @@ public class KafkaCustomAuthorizer implements Authorizer {
                String fullName = arg0.principal().getName();
                fullName = fullName != null ? fullName.trim() : fullName;
                String topicName = null;
-               String namspace = null;
-               String ins = null;
-               String type = null;
-               String action = null;
-
-               String kafkaactivity = arg1.name();
-
-               if (kafkaactivity.equals("Read")) {
-                       action = "sub";
-               } else if (kafkaactivity.equals("Write")) {
-                       action = "pub";
-               } else if (kafkaactivity.equals("Create")) {
-                       action = "create";
-               } else {
+               String[] permission = new String[3];
+
+               String resource = arg2.resourceType().name();
+
+               if (resource.equals("Topic")) {
+                       topicName = arg2.name();
+               }
+
+               if (fullName != null && fullName.equals(Cadi3AAFProvider.getKafkaUsername())) {
                        return true;
                }
 
-               if (arg2.resourceType().name().equals("Topic")) {
-                       topicName = arg2.name();
-               } else {
+               if (null != topicName && !topicName.startsWith("org.onap")) {
                        return true;
                }
 
-               try {
-
-                       if (null != topicName && topicName.indexOf(".") > 0) {
-
-                               if (action.equals("create")) {
-                                       String instancePart = (System.getenv("msgRtr.topicfactory.aaf") != null)
-                                                       ? System.getenv("msgRtr.topicfactory.aaf")
-                                                       : "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:";
-                                       String[] instandType = (instancePart + namspace + "|create").split("|");
-                                       ins = instandType[0];
-                                       type = instandType[1];
-                               } else if (action.equals("pub") || action.equals("sub")) {
-                                       namspace = topicName.substring(0, topicName.lastIndexOf("."));
-                                       String instancePart = (System.getenv("pubSubInstPart") != null) ? System.getenv("pubSubInstPart")
-                                                       : ".topic";
-                                       ins = namspace + instancePart;
-                                       type = ":topic." + topicName;
-                               }
-                               logger.info("^Event Received for topic " + topicName + " , User " + fullName + " , action = " + action);
-                       }
+               permission = getPermission(arg1.toJava(), resource, topicName);
 
-                       if (null != fullName && fullName.equals("admin")) {
-                               return true;
-                       }
+               if (permission[0] == null) {
+                       return true;
+               } else {
 
-                       if (null != topicName) {
-                               boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
-                                               .hasPermission(fullName, ins, type, action);
-                               if (hasResp) {
-                                       logger.info("Successful Authorization for " + fullName + " on " + topicName + " for " + ins + "|"
-                                                       + type + "|" + action);
-                               }
-                               if (!hasResp) {
-                                       logger.info(fullName + " is not allowed in " + ins + "|" + type + "|" + action);
-                                       return false;
+                       try {
+
+                               if (null != topicName) {
+                                       boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
+                                                       .hasPermission(fullName, permission[0], permission[1], permission[2]);
+                                       if (hasResp) {
+                                               logger.info("Successful Authorization for " + fullName + " on " + topicName + " for "
+                                                               + permission[0] + "|" + permission[1] + "|" + permission[2]);
+                                       }
+                                       if (!hasResp) {
+                                               logger.info(fullName + " is not allowed in " + permission[0] + "|" + permission[1] + "|"
+                                                               + permission[2]);
+                                               return false;
+                                       }
                                }
+                       } catch (final Exception e) {
+                               return false;
                        }
-               } catch (final Exception e) {
-                       return false;
+                       return true;
+
                }
-               return true;
        }
 
        @Override
index f28671b..508d583 100644 (file)
@@ -53,14 +53,9 @@ public class PlainSaslServer1 implements SaslServer {
 
        public static final String PLAIN_MECHANISM = "PLAIN";
 
-       private final JaasContext jaasContext;
-
        private boolean complete;
        private String authorizationID;
 
-       public PlainSaslServer1(JaasContext jaasContext) {
-               this.jaasContext = jaasContext;
-       }
 
        @Override
        public byte[] evaluateResponse(byte[] response) throws SaslException {
@@ -164,11 +159,7 @@ public class PlainSaslServer1 implements SaslServer {
                                throw new SaslException(
                                                String.format("Mechanism \'%s\' is not supported. Only PLAIN is supported.", mechanism));
 
-                       if (!(cbh instanceof SaslServerCallbackHandler))
-                               throw new SaslException(
-                                               "CallbackHandler must be of type SaslServerCallbackHandler, but it is: " + cbh.getClass());
-
-                       return new PlainSaslServer1(((SaslServerCallbackHandler) cbh).jaasContext());
+                       return new PlainSaslServer1();
                }
 
                @Override
index 4ac81f3..747e34e 100644 (file)
@@ -22,9 +22,11 @@ package org.onap.dmaap.commonauth.kafka.base.authorization;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.net.ssl.*", "javax.security.auth.*"})
 public class AuthorizationProviderFactoryTest {
 
        @Test
index 70631dc..743917d 100644 (file)
@@ -22,6 +22,7 @@ package org.onap.dmaap.commonauth.kafka.base.authorization;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.when;
 
@@ -40,7 +41,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 
 @RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.net.ssl.*")
+@PowerMockIgnore({"javax.net.ssl.*", "javax.security.auth.*"})
 public class Cadi3AAFProviderTest {
 
        public Cadi3AAFProvider cadi3AAFProvider;
@@ -60,36 +61,34 @@ public class Cadi3AAFProviderTest {
        @Before
        public void setUp() throws Exception {
                MockitoAnnotations.initMocks(this);
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               cadi3AAFProvider = new Cadi3AAFProvider();
        }
 
        @Test
        public void testHasPermission() {
-               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
-               cadi3AAFProvider = new Cadi3AAFProvider();
                assertFalse(cadi3AAFProvider.hasPermission("userID", "permission", "instance", "action"));
        }
 
        @Test
        public void testHasAdminPermission() {
-               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
-               cadi3AAFProvider = new Cadi3AAFProvider();
                assertEquals(cadi3AAFProvider.hasPermission("admin", "permission", "instance", "action"), true);
        }
        
        @Test(expected = NullPointerException.class)
        public void tesAuthenticate() throws Exception {
-               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
-               cadi3AAFProvider = new Cadi3AAFProvider();
                when(aafAuthn.validate("userId", "password")).thenReturn("valid");
                assertEquals(cadi3AAFProvider.authenticate("userId", "password"), "valid");
        }
 
        @Test
-       public void tesAuthenticateadmin() throws Exception {
-               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
-               cadi3AAFProvider = new Cadi3AAFProvider();
-               when(aafAuthn.validate("admin", "password")).thenReturn("valid");
-               assertNull(cadi3AAFProvider.authenticate("admin", "password"));
+       public void tesAuthenticateAdmin() throws Exception {
+               assertNull(cadi3AAFProvider.authenticate("kafkaUsername", "apiKey"));
+       }
+       
+       @Test
+       public void tesAuthenticateAdminwtWrongCred() throws Exception {
+               assertNotNull(cadi3AAFProvider.authenticate("kafkaUsername", "api"));
        }
        
 }
index 4793acf..ae76534 100644 (file)
  *******************************************************************************/
 package org.onap.dmaap.kafkaAuthorize;
 
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,7 +43,7 @@ import kafka.security.auth.Resource;
 import kafka.security.auth.ResourceType;
 
 @RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.net.ssl.*")
+@PowerMockIgnore({"javax.net.ssl.*", "javax.security.auth.*"})
 @PrepareForTest({ AuthorizationProviderFactory.class })
 public class KafkaCustomAuthorizerTest {
        @Mock
@@ -84,7 +86,7 @@ public class KafkaCustomAuthorizerTest {
        @Test
        public void testAuthorizerSuccess() {
 
-
+               
                PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
                                .thenReturn(true);
                authorizer = new KafkaCustomAuthorizer();
@@ -95,6 +97,61 @@ public class KafkaCustomAuthorizerTest {
        @Test
        public void testAuthorizerFailure() {
                System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.CREATE);
+               System.setProperty("msgRtr.topicfactory.aaf", "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
+       @Test
+       public void testAuthorizerFailure1() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(resourceType.name()).thenReturn("Cluster");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.CREATE);
+               System.setProperty("msgRtr.topicfactory.aaf", "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
+       @Test
+       public void testAuthorizerFailure2() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(resourceType.name()).thenReturn("Topic");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.WRITE);
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
+       @Test
+       public void testAuthorizerFailure3() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(resourceType.name()).thenReturn("Topic");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.DESCRIBE);
                PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
                                .thenReturn(false);
                authorizer = new KafkaCustomAuthorizer();
@@ -105,5 +162,57 @@ public class KafkaCustomAuthorizerTest {
                }
 
        }
+       @Test
+       public void testAuthorizerFailure4() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(resourceType.name()).thenReturn("Topic");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.READ);
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
+       @Test
+       public void testAuthorizerFailure5() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(resourceType.name()).thenReturn("Cluster");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.IDEMPOTENT_WRITE);
+               System.setProperty("msgRtr.topicfactory.aaf", "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
+       @Test
+       public void testAuthorizerFailure6() {
+               System.setProperty("CADI_PROPERTIES", "src/test/resources/cadi.properties");
+               PowerMockito.when(arg2.name()).thenReturn("org.onap.dmaap.mr.testtopic");
+               PowerMockito.when(arg1.toJava()).thenReturn(AclOperation.DELETE);
+               System.setProperty("msgRtr.topicfactory.aaf", "org.onap.dmaap.mr.topicFactory|:org.onap.dmaap.mr.topic:");
+               PowerMockito.when(provider.hasPermission("fullName", "namespace.topic", ":topic.namespace.Topic", "pub"))
+                               .thenReturn(false);
+               authorizer = new KafkaCustomAuthorizer();
+               try {
+                       authorizer.authorize(arg0, arg1, arg2);
+               } catch (Exception e) {
+                       assertTrue(true);
+               }
+
+       }
+       
 
 }
index 3e73062..c6d89f3 100644 (file)
@@ -34,14 +34,16 @@ import org.mockito.MockitoAnnotations;
 import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProvider;
 import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.security.auth.*")
 @PrepareForTest({ AuthorizationProviderFactory.class })
 public class PlainSaslServer1Test {
 
-       PlainSaslServer1 sslServer = new PlainSaslServer1(null);
+       PlainSaslServer1 sslServer = new PlainSaslServer1();
        @Mock
        JaasContext jaasContext;
        @Mock