2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8 * =================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import static org.onap.dmaap.util.DMaaPAuthFilter.isUseCustomAcls;
26 import com.att.ajsc.beans.PropertiesMapBean;
27 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30 import com.att.nsa.configs.ConfigDbException;
31 import com.att.nsa.security.NsaAcl;
32 import com.att.nsa.security.NsaApiKey;
33 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
34 import joptsimple.internal.Strings;
35 import org.apache.commons.lang.StringUtils;
36 import org.apache.commons.lang.math.NumberUtils;
37 import org.apache.http.HttpStatus;
38 import org.json.JSONArray;
39 import org.json.JSONException;
40 import org.json.JSONObject;
41 import org.onap.dmaap.dmf.mr.CambriaApiException;
42 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
44 import org.onap.dmaap.dmf.mr.beans.TopicBean;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
49 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
52 import org.onap.dmaap.dmf.mr.metabroker.Topic;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
54 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
55 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
56 import org.onap.dmaap.dmf.mr.service.TopicService;
57 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
58 import org.onap.dmaap.dmf.mr.utils.Utils;
59 import org.springframework.beans.factory.annotation.Autowired;
60 import org.springframework.stereotype.Service;
62 import javax.servlet.http.HttpServletRequest;
63 import java.io.IOException;
64 import java.security.Principal;
67 * @author muzainulhaque.qazi
71 public class TopicServiceImpl implements TopicService {
73 private static final String TOPIC_CREATE_OP = "create";
74 private static final EELFLogger LOGGER = EELFManager.getLogger(TopicServiceImpl.class);
76 private DMaaPErrorMessages errorMessages;
78 public DMaaPErrorMessages getErrorMessages() {
82 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
83 this.errorMessages = errorMessages;
87 String getPropertyFromAJSCbean(String propertyKey) {
88 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
91 String getPropertyFromAJSCmap(String propertyKey) {
92 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
95 NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
96 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
99 void respondOk(DMaaPContext context, String msg) {
100 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
103 void respondOk(DMaaPContext context, JSONObject json) throws IOException {
104 DMaaPResponseBuilder.respondOk(context, json);
107 boolean isCadiEnabled() {
108 return Utils.isCadiEnabled();
111 * @param dmaapContext
112 * @throws JSONException
113 * @throws ConfigDbException
114 * @throws IOException
118 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
119 LOGGER.info("Fetching list of all the topics.");
120 JSONObject json = new JSONObject();
121 JSONArray topicsList = new JSONArray();
122 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
123 topicsList.put(topic.getName());
125 json.put("topics", topicsList);
126 LOGGER.info("Returning list of all the topics.");
127 respondOk(dmaapContext, json);
131 * @param dmaapContext
132 * @throws JSONException
133 * @throws ConfigDbException
134 * @throws IOException
137 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
138 LOGGER.info("Fetching list of all the topics.");
139 JSONObject json = new JSONObject();
140 JSONArray topicsList = new JSONArray();
141 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
142 JSONObject obj = new JSONObject();
143 obj.put("topicName", topic.getName());
145 obj.put("owner", topic.getOwner());
146 obj.put("txenabled", topic.isTransactionEnabled());
149 json.put("topics", topicsList);
150 LOGGER.info("Returning list of all the topics.");
151 respondOk(dmaapContext, json);
155 * @param dmaapContext
157 * @throws ConfigDbException
158 * @throws IOException
159 * @throws TopicExistsException
162 public void getTopic(DMaaPContext dmaapContext, String topicName)
163 throws ConfigDbException, IOException, TopicExistsException {
164 LOGGER.info("Fetching details of topic " + topicName);
165 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
167 LOGGER.error("Topic [" + topicName + "] does not exist.");
168 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
170 JSONObject o = new JSONObject();
171 o.put("name", t.getName());
172 o.put("description", t.getDescription());
173 if (null != t.getOwners())
174 o.put("owner", t.getOwners().iterator().next());
175 if (null != t.getReaderAcl())
176 o.put("readerAcl", aclToJson(t.getReaderAcl()));
177 if (null != t.getWriterAcl())
178 o.put("writerAcl", aclToJson(t.getWriterAcl()));
179 LOGGER.info("Returning details of topic " + topicName);
180 respondOk(dmaapContext, o);
184 * @param dmaapContext
186 * @throws CambriaApiException
187 * @throws AccessDeniedException
188 * @throws IOException
189 * @throws TopicExistsException
190 * @throws JSONException
196 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
197 String topicName = topicBean.getTopicName();
198 LOGGER.info("Creating topic {}",topicName);
199 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
201 final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
202 final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
203 final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
204 key, partitions, replicas, topicBean.isTransactionEnabled());
205 LOGGER.info("Topic {} created successfully. Sending response", topicName);
206 respondOk(dmaapContext, topicToJson(t));
207 } catch (JSONException ex) {
208 LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
209 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
210 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
211 LOGGER.info(errRes.toString());
212 throw new CambriaApiException(errRes);
213 } catch (ConfigDbException ex) {
214 LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex);
215 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
216 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
217 LOGGER.info(errRes.toString());
218 throw new CambriaApiException(errRes);
219 } catch (Broker1.TopicExistsException ex) {
220 LOGGER.error( "Failed to create topic "+ topicName +". Topic already exists.",ex);
224 private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
225 String clientId = Strings.EMPTY;
226 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
227 LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
228 String permission = buildPermission(topicName, operation);
229 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
230 clientId = getAAFclientId(dmaapContext.getRequest());
232 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
233 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
234 operation, topicName, clientId, permission);
235 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
236 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
237 "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
239 } else if (operation.equals(TOPIC_CREATE_OP)){
240 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
241 clientId = (user != null) ? user.getKey() : Strings.EMPTY;
246 private String getAAFclientId(HttpServletRequest request) {
247 Principal principal = request.getUserPrincipal();
248 if (principal !=null) {
249 return principal.getName();
251 LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
256 private boolean isTopicWithEnforcedAuthorization(String topicName) {
257 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
258 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
261 int getValueOrDefault(int value, String defaultProperty) {
262 int returnValue = value;
263 if (returnValue <= 0) {
264 String defaultValue = getPropertyFromAJSCmap(defaultProperty);
265 returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
266 returnValue = (returnValue <= 0) ? 1 : returnValue;
271 private String buildPermission(String topicName, String operation) {
272 String nameSpace = (topicName.indexOf('.') > 1) ?
273 topicName.substring(0, topicName.lastIndexOf('.')) : "";
275 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
276 return mrFactoryValue + nameSpace + "|" + operation;
280 * @param dmaapContext
282 * @throws ConfigDbException
283 * @throws IOException
284 * @throws TopicExistsException
285 * @throws CambriaApiException
286 * @throws AccessDeniedException
289 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
290 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
291 LOGGER.info(" Deleting topic " + topicName);
292 authorizeClient(dmaapContext, topicName, "destroy");
293 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
295 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
296 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
298 // metabroker.deleteTopic(topicName);
299 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
300 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
305 * @param dmaapContext
308 DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
309 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
313 * @param dmaapContext
315 * @throws ConfigDbException
316 * @throws IOException
317 * @throws TopicExistsException
321 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
322 throws ConfigDbException, IOException, TopicExistsException {
323 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
324 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
326 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
327 throw new TopicExistsException(
328 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
330 final NsaAcl acl = topic.getWriterAcl();
331 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
332 respondOk(dmaapContext, aclToJson(acl));
340 private static JSONObject aclToJson(NsaAcl acl) {
341 final JSONObject o = new JSONObject();
343 o.put("enabled", false);
344 o.put("users", new JSONArray());
346 o.put("enabled", acl.isActive());
348 final JSONArray a = new JSONArray();
349 for (String user : acl.getUsers()) {
358 * @param dmaapContext
362 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
363 throws IOException, ConfigDbException, TopicExistsException {
364 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
365 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
367 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
368 throw new TopicExistsException(
369 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
371 final NsaAcl acl = topic.getReaderAcl();
372 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
373 respondOk(dmaapContext, aclToJson(acl));
382 static JSONObject topicToJson(Topic t) {
383 final JSONObject o = new JSONObject();
385 o.put("name", t.getName());
386 o.put("description", t.getDescription());
387 o.put("owner", t.getOwner());
388 o.put("readerAcl", aclToJson(t.getReaderAcl()));
389 o.put("writerAcl", aclToJson(t.getWriterAcl()));
395 * @param dmaapContext
396 * @param topicName @param producerId @throws
397 * ConfigDbException @throws IOException @throws
398 * TopicExistsException @throws AccessDeniedException @throws
402 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
403 throws AccessDeniedException, ConfigDbException, TopicExistsException {
404 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
405 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
406 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
408 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
409 + "] does not exist.");
410 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
411 + "] for topic. Topic [" + topicName + "] does not exist.");
413 if (isUseCustomAcls()) {
414 topic.permitWritesFromUser(producerId, user);
415 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
416 + "]. Sending response.");
418 LOGGER.info("Ignoring acl update");
420 respondOk(dmaapContext, "Write access has been granted to publisher.");
424 * @param dmaapContext
427 * @throws ConfigDbException
428 * @throws IOException
429 * @throws TopicExistsException
430 * @throws AccessDeniedException
431 * @throws DMaaPAccessDeniedException
435 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
436 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
437 DMaaPAccessDeniedException {
438 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
439 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
440 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
442 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
443 + "] does not exist.");
444 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
445 + "] for topic. Topic [" + topicName + "] does not exist.");
447 if (isUseCustomAcls()) {
448 topic.denyWritesFromUser(producerId, user);
449 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
450 + "]. Sending response.");
452 LOGGER.info("Ignoring acl update");
454 respondOk(dmaapContext, "Write access has been revoked for publisher.");
458 * @param dmaapContext
461 * @throws DMaaPAccessDeniedException
464 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
465 throws AccessDeniedException, ConfigDbException, TopicExistsException {
467 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
468 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
469 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
471 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
472 + "] does not exist.");
473 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
474 + "] for topic. Topic [" + topicName + "] does not exist.");
476 if (isUseCustomAcls()) {
477 topic.permitReadsByUser(consumerId, user);
478 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
479 + "]. Sending response.");
481 LOGGER.info("Ignoring acl update");
483 respondOk(dmaapContext,
484 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
488 * @param dmaapContext
491 * @throws DMaaPAccessDeniedException
494 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
495 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
496 DMaaPAccessDeniedException {
498 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
499 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
500 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
502 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
503 + "] does not exist.");
504 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
505 + "] for topic. Topic [" + topicName + "] does not exist.");
507 if (isUseCustomAcls()) {
508 topic.denyReadsByUser(consumerId, user);
509 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
510 + "]. Sending response.");
512 LOGGER.info("Ignoring acl update");
514 respondOk(dmaapContext,
515 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");