1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.dmf.mr.backends.memory;
24 import java.util.HashMap;
25 import java.util.LinkedList;
26 import java.util.List;
28 import java.util.TreeSet;
30 import com.att.dmf.mr.metabroker.Broker;
31 import com.att.dmf.mr.metabroker.Topic;
32 import com.att.nsa.configs.ConfigDb;
33 import com.att.nsa.security.NsaAcl;
34 import com.att.nsa.security.NsaApiKey;
38 * @author anowarul.islam
41 public class MemoryMetaBroker implements Broker {
43 private final MemoryQueue fQueue;
44 private final HashMap<String, MemTopic> fTopics;
52 public MemoryMetaBroker(MemoryQueue mq, ConfigDb configDb) {
55 fTopics = new HashMap<>();
59 public List<Topic> getAllTopics() {
60 return new LinkedList<Topic>(fTopics.values());
64 public Topic getTopic(String topic) {
65 return fTopics.get(topic);
69 public Topic createTopic(String topic, String desc, String ownerApiId, int partitions, int replicas,
70 boolean transactionEnabled) throws TopicExistsException {
71 if (getTopic(topic) != null) {
72 throw new TopicExistsException(topic);
74 fQueue.createTopic(topic);
75 fTopics.put(topic, new MemTopic(topic, desc, ownerApiId, transactionEnabled));
76 return getTopic(topic);
80 public void deleteTopic(String topic) {
81 fTopics.remove(topic);
82 fQueue.removeTopic(topic);
85 private static class MemTopic implements Topic {
87 private final String fName;
88 private final String fDesc;
89 private final String fOwner;
90 private NsaAcl fReaders;
91 private NsaAcl fWriters;
92 private boolean ftransactionEnabled;
93 private String accessDenied = "User does not own this topic ";
96 * constructor initialization
101 * @param transactionEnabled
103 public MemTopic(String name, String desc, String owner, boolean transactionEnabled) {
107 ftransactionEnabled = transactionEnabled;
113 public String getOwner() {
118 public NsaAcl getReaderAcl() {
123 public NsaAcl getWriterAcl() {
128 public void checkUserRead(NsaApiKey user) throws AccessDeniedException {
129 if (fReaders != null && (user == null || !fReaders.canUser(user.getKey()))) {
130 throw new AccessDeniedException(user == null ? "" : user.getKey());
135 public void checkUserWrite(NsaApiKey user) throws AccessDeniedException {
136 if (fWriters != null && (user == null || !fWriters.canUser(user.getKey()))) {
137 throw new AccessDeniedException(user == null ? "" : user.getKey());
142 public String getName() {
147 public String getDescription() {
152 public void permitWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
153 if (!fOwner.equals(asUser.getKey())) {
154 throw new AccessDeniedException(accessDenied + fName);
156 if (fWriters == null) {
157 fWriters = new NsaAcl();
159 fWriters.add(publisherId);
163 public void denyWritesFromUser(String publisherId, NsaApiKey asUser) throws AccessDeniedException {
164 if (!fOwner.equals(asUser.getKey())) {
165 throw new AccessDeniedException(accessDenied + fName);
167 fWriters.remove(publisherId);
171 public void permitReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
172 if (!fOwner.equals(asUser.getKey())) {
173 throw new AccessDeniedException(accessDenied + fName);
175 if (fReaders == null) {
176 fReaders = new NsaAcl();
178 fReaders.add(consumerId);
182 public void denyReadsByUser(String consumerId, NsaApiKey asUser) throws AccessDeniedException {
183 if (!fOwner.equals(asUser.getKey())) {
184 throw new AccessDeniedException(accessDenied + fName);
186 fReaders.remove(consumerId);
190 public boolean isTransactionEnabled() {
191 return ftransactionEnabled;
195 public Set<String> getOwners() {
196 final TreeSet<String> set = new TreeSet<> ();