[DMAAP-KAFKA] Fix sonar coverage etc
[dmaap/kafka11aaf.git] / src / main / java / org / onap / dmaap / kafkaauthorize / KafkaCustomAuthorizer.java
@@ -3,6 +3,7 @@
  *  org.onap.dmaap
  *  ================================================================================
  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  Modification copyright (C) 2021 Nordix Foundation.
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -18,7 +19,7 @@
  *  
  *  
  *******************************************************************************/
-package org.onap.dmaap.kafkaAuthorize;
+package org.onap.dmaap.kafkaauthorize;
 
 import java.util.EnumSet;
 import java.util.Map;
@@ -45,12 +46,13 @@ import scala.collection.immutable.Set;
  */
 public class KafkaCustomAuthorizer implements Authorizer {
 
-       private String[] adminPermission = new String[3];
-       public static final EnumSet<AclOperation> TOPIC_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.DESCRIBE_CONFIGS);
-       public static final EnumSet<AclOperation> TOPIC_READ_WRITE_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.WRITE,
+       private final String[] adminPermission = new String[3];
+       protected static final EnumSet<AclOperation> TOPIC_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.DESCRIBE_CONFIGS);
+       protected static final EnumSet<AclOperation> TOPIC_READ_WRITE_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.WRITE,
                        AclOperation.READ, AclOperation.DESCRIBE_CONFIGS);
-       public static final EnumSet<AclOperation> TOPIC_ADMIN_OPERATIONS = EnumSet.of(AclOperation.ALTER,
+       protected static final EnumSet<AclOperation> TOPIC_ADMIN_OPERATIONS = EnumSet.of(AclOperation.ALTER,
                        AclOperation.ALTER_CONFIGS, AclOperation.CREATE);
+       static final String TOPIC = "Topic";
 
        private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class);
 
@@ -85,10 +87,10 @@ public class KafkaCustomAuthorizer implements Authorizer {
 
                        }
                } else if (aclOperation.equals(AclOperation.DELETE)) {
-                       permission = new String(System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|destroy").split("\\|");
+                       permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|destroy").split("\\|");
 
                } else if (TOPIC_ADMIN_OPERATIONS.contains(aclOperation)) {
-                       permission = new String(System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|create").split("\\|");
+                       permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|create").split("\\|");
                }
 
                return permission;
@@ -113,7 +115,7 @@ public class KafkaCustomAuthorizer implements Authorizer {
                case ALTER_CONFIGS:
                case CREATE:
                case DELETE:
-                       if (resource.equals("Topic")) {
+                       if (resource.equals(TOPIC)) {
                                permission = getTopicPermission(topicName, aclOperation);
                        } else if (resource.equals("Cluster")) {
                                permission = getAdminPermission();
@@ -122,7 +124,7 @@ public class KafkaCustomAuthorizer implements Authorizer {
                case DESCRIBE_CONFIGS:
                case READ:
                case WRITE:
-                       if (resource.equals("Topic")) {
+                       if (resource.equals(TOPIC)) {
                                permission = getTopicPermission(topicName, aclOperation);
                        }
                        break;
@@ -135,7 +137,6 @@ public class KafkaCustomAuthorizer implements Authorizer {
                        break;
 
                }
-
                return permission;
 
        }
@@ -149,11 +150,11 @@ public class KafkaCustomAuthorizer implements Authorizer {
                String fullName = arg0.principal().getName();
                fullName = fullName != null ? fullName.trim() : fullName;
                String topicName = null;
-               String[] permission = new String[3];
+               String[] permission;
 
                String resource = arg2.resourceType().name();
 
-               if (resource.equals("Topic")) {
+               if (resource.equals(TOPIC)) {
                        topicName = arg2.name();
                }
 
@@ -167,31 +168,32 @@ public class KafkaCustomAuthorizer implements Authorizer {
 
                permission = getPermission(arg1.toJava(), resource, topicName);
 
-               if (permission[0] == null) {
-                       return true;
-               } else {
-
-                       try {
-
-                               if (null != topicName) {
-                                       boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
-                                                       .hasPermission(fullName, permission[0], permission[1], permission[2]);
-                                       if (hasResp) {
-                                               logger.info("Successful Authorization for " + fullName + " on " + topicName + " for "
-                                                               + permission[0] + "|" + permission[1] + "|" + permission[2]);
-                                       }
-                                       if (!hasResp) {
-                                               logger.info(fullName + " is not allowed in " + permission[0] + "|" + permission[1] + "|"
-                                                               + permission[2]);
-                                               return false;
-                                       }
+               if (permission[0] != null) {
+                       return !checkPermissions(fullName, topicName, permission);
+               }
+               return true;
+       }
+
+       private boolean checkPermissions(String fullName, String topicName, String[] permission) {
+               try {
+
+                       if (null != topicName) {
+                               boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
+                                       .hasPermission(fullName, permission[0], permission[1], permission[2]);
+                               if (hasResp) {
+                                       logger.info("Successful Authorization for {} on {} for {} | {} | {}", fullName, topicName,
+                                               permission[0], permission[1], permission[2]);
+                               }
+                               if (!hasResp) {
+                                       logger.info("{} is not allowed in {} | {} | {}", fullName, permission[0], permission[1],
+                                               permission[2]);
+                                       return true;
                                }
-                       } catch (final Exception e) {
-                               return false;
                        }
+               } catch (final Exception e) {
                        return true;
-
                }
+               return false;
        }
 
        @Override