[DMAAP-KAFKA] Release image 1.1.0
[dmaap/kafka11aaf.git] / src / main / java / org / onap / dmaap / kafkaauthorize / KafkaCustomAuthorizer.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  Modification copyright (C) 2021 Nordix Foundation.
7  *  ================================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *        http://www.apache.org/licenses/LICENSE-2.0
12 *  
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *  ============LICENSE_END=========================================================
19  *  
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.kafkaauthorize;
23
24 import java.util.EnumSet;
25 import java.util.Map;
26
27 import org.apache.kafka.common.acl.AclOperation;
28 import org.apache.kafka.common.security.auth.KafkaPrincipal;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import org.onap.dmaap.commonauth.kafka.base.authorization.AuthorizationProviderFactory;
33 import org.onap.dmaap.commonauth.kafka.base.authorization.Cadi3AAFProvider;
34
35 import kafka.network.RequestChannel.Session;
36 import kafka.security.auth.Acl;
37 import kafka.security.auth.Authorizer;
38 import kafka.security.auth.Operation;
39 import kafka.security.auth.Resource;
40 import scala.collection.immutable.Set;
41
42 /**
43  * A trivial Kafka Authorizer for use with SSL and AAF
44  * Authentication/Authorization.
45  * 
46  */
47 public class KafkaCustomAuthorizer implements Authorizer {
48
49         private final String[] adminPermission = new String[3];
50         protected static final EnumSet<AclOperation> TOPIC_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.DESCRIBE_CONFIGS);
51         protected static final EnumSet<AclOperation> TOPIC_READ_WRITE_DESCRIBE_OPERATIONS = EnumSet.of(AclOperation.WRITE,
52                         AclOperation.READ, AclOperation.DESCRIBE_CONFIGS);
53         protected static final EnumSet<AclOperation> TOPIC_ADMIN_OPERATIONS = EnumSet.of(AclOperation.ALTER,
54                         AclOperation.ALTER_CONFIGS, AclOperation.CREATE);
55         static final String TOPIC = "Topic";
56
57         private static final Logger logger = LoggerFactory.getLogger(KafkaCustomAuthorizer.class);
58
59         @Override
60         public void configure(final Map<String, ?> arg0) {
61                 // TODO Auto-generate method stub
62         }
63
64         @Override
65         public void addAcls(final Set<Acl> arg0, final Resource arg1) {
66                 // TODO Auto-generated method stub
67
68         }
69
70         private String[] getTopicPermission(String topicName, AclOperation aclOperation) {
71
72                 String namspace = topicName.substring(0, topicName.lastIndexOf("."));
73                 String[] permission = new String[3];
74                 if (TOPIC_READ_WRITE_DESCRIBE_OPERATIONS.contains(aclOperation)) {
75                         permission[0] = namspace + ".topic";
76                         String instancePart = (System.getenv("pubSubInstPart") != null) ? System.getenv("pubSubInstPart")
77                                         : ".topic";
78                         permission[1] = instancePart + topicName;
79
80                         if (aclOperation.equals(AclOperation.WRITE)) {
81                                 permission[2] = "pub";
82                         } else if (aclOperation.equals(AclOperation.READ)) {
83                                 permission[2] = "sub";
84
85                         } else if (TOPIC_DESCRIBE_OPERATIONS.contains(aclOperation)) {
86                                 permission[2] = "view";
87
88                         }
89                 } else if (aclOperation.equals(AclOperation.DELETE)) {
90                         permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|destroy").split("\\|");
91
92                 } else if (TOPIC_ADMIN_OPERATIONS.contains(aclOperation)) {
93                         permission = (System.getProperty("msgRtr.topicfactory.aaf") + namspace + "|create").split("\\|");
94                 }
95
96                 return permission;
97         }
98
99         private String[] getAdminPermission() {
100
101                 if (adminPermission[0] == null) {
102                         adminPermission[0] = System.getProperty("namespace") + ".kafka.access";
103                         adminPermission[1] = "*";
104                         adminPermission[2] = "*";
105                 }
106
107                 return adminPermission;
108         }
109
110         private String[] getPermission(AclOperation aclOperation, String resource, String topicName) {
111                 String[] permission = new String[3];
112                 switch (aclOperation) {
113
114                 case ALTER:
115                 case ALTER_CONFIGS:
116                 case CREATE:
117                 case DELETE:
118                         if (resource.equals(TOPIC)) {
119                                 permission = getTopicPermission(topicName, aclOperation);
120                         } else if (resource.equals("Cluster")) {
121                                 permission = getAdminPermission();
122                         }
123                         break;
124                 case DESCRIBE_CONFIGS:
125                 case READ:
126                 case WRITE:
127                         if (resource.equals(TOPIC)) {
128                                 permission = getTopicPermission(topicName, aclOperation);
129                         }
130                         break;
131                 case IDEMPOTENT_WRITE:
132                         if (resource.equals("Cluster")) {
133                                 permission = getAdminPermission();
134                         }
135                         break;
136                 default:
137                         break;
138
139                 }
140                 return permission;
141
142         }
143
144         @Override
145         public boolean authorize(final Session arg0, final Operation arg1, final Resource arg2) {
146                 if (arg0.principal() == null) {
147                         return false;
148                 }
149
150                 String fullName = arg0.principal().getName();
151                 fullName = fullName != null ? fullName.trim() : fullName;
152                 String topicName = null;
153                 String[] permission;
154
155                 String resource = arg2.resourceType().name();
156
157                 if (resource.equals(TOPIC)) {
158                         topicName = arg2.name();
159                 }
160
161                 if (fullName != null && fullName.equals(Cadi3AAFProvider.getKafkaUsername())) {
162                         return true;
163                 }
164
165                 if ((!Cadi3AAFProvider.isCadiEnabled())||(null != topicName && !topicName.startsWith("org.onap"))) {
166                         return true;
167                 }
168
169                 permission = getPermission(arg1.toJava(), resource, topicName);
170
171                 if (permission[0] != null) {
172                         return !checkPermissions(fullName, topicName, permission);
173                 }
174                 return true;
175         }
176
177         private boolean checkPermissions(String fullName, String topicName, String[] permission) {
178                 try {
179
180                         if (null != topicName) {
181                                 boolean hasResp = AuthorizationProviderFactory.getProviderFactory().getProvider()
182                                         .hasPermission(fullName, permission[0], permission[1], permission[2]);
183                                 if (hasResp) {
184                                         logger.info("Successful Authorization for {} on {} for {} | {} | {}", fullName, topicName,
185                                                 permission[0], permission[1], permission[2]);
186                                 }
187                                 if (!hasResp) {
188                                         logger.info("{} is not allowed in {} | {} | {}", fullName, permission[0], permission[1],
189                                                 permission[2]);
190                                         return true;
191                                 }
192                         }
193                 } catch (final Exception e) {
194                         return true;
195                 }
196                 return false;
197         }
198
199         @Override
200         public void close() {
201                 // TODO Auto-generated method stub
202
203         }
204
205         @Override
206         public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls() {
207                 // TODO Auto-generated method stub
208                 return null;
209         }
210
211         @Override
212         public scala.collection.immutable.Map<Resource, Set<Acl>> getAcls(final KafkaPrincipal arg0) {
213                 // TODO Auto-generated method stub
214                 return null;
215         }
216
217         @Override
218         public boolean removeAcls(final Resource arg0) {
219                 // TODO Auto-generated method stub
220                 return false;
221         }
222
223         @Override
224         public boolean removeAcls(final Set<Acl> arg0, final Resource arg1) {
225                 // TODO Auto-generated method stub
226                 return false;
227         }
228
229         public Set<Acl> getAcls(Resource arg0) {
230                 // TODO Auto-generated method stub
231                 return null;
232         }
233 }