2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2019 Nokia Intellectual Property. All rights reserved.
8 * =================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.dmaap.dmf.mr.service.impl;
24 import com.att.ajsc.beans.PropertiesMapBean;
25 import com.att.ajsc.filemonitor.AJSCPropertiesMap;
26 import com.att.eelf.configuration.EELFLogger;
27 import com.att.eelf.configuration.EELFManager;
28 import com.att.nsa.configs.ConfigDbException;
29 import com.att.nsa.security.NsaAcl;
30 import com.att.nsa.security.NsaApiKey;
31 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
32 import joptsimple.internal.Strings;
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.lang.math.NumberUtils;
35 import org.apache.http.HttpStatus;
36 import org.json.JSONArray;
37 import org.json.JSONException;
38 import org.json.JSONObject;
39 import org.onap.dmaap.dmf.mr.CambriaApiException;
40 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
41 import org.onap.dmaap.dmf.mr.beans.DMaaPKafkaMetaBroker;
42 import org.onap.dmaap.dmf.mr.beans.TopicBean;
43 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
44 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
45 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
47 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
48 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
49 import org.onap.dmaap.dmf.mr.metabroker.Broker1;
50 import org.onap.dmaap.dmf.mr.metabroker.Topic;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.security.DMaaPAuthenticatorImpl;
54 import org.onap.dmaap.dmf.mr.service.TopicService;
55 import org.onap.dmaap.dmf.mr.utils.DMaaPResponseBuilder;
56 import org.onap.dmaap.dmf.mr.utils.Utils;
57 import org.springframework.beans.factory.annotation.Autowired;
58 import org.springframework.stereotype.Service;
60 import javax.servlet.http.HttpServletRequest;
61 import java.io.IOException;
62 import java.security.Principal;
65 * @author muzainulhaque.qazi
69 public class TopicServiceImpl implements TopicService {
71 private static final String TOPIC_CREATE_OP = "create";
72 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
74 private DMaaPErrorMessages errorMessages;
76 public DMaaPErrorMessages getErrorMessages() {
80 public void setErrorMessages(DMaaPErrorMessages errorMessages) {
81 this.errorMessages = errorMessages;
85 String getPropertyFromAJSCbean(String propertyKey) {
86 return PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
89 String getPropertyFromAJSCmap(String propertyKey) {
90 return AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, propertyKey);
93 NsaApiKey getDmaapAuthenticatedUser(DMaaPContext dmaapContext) {
94 return DMaaPAuthenticatorImpl.getAuthenticatedUser(dmaapContext);
97 void respondOk(DMaaPContext context, String msg) {
98 DMaaPResponseBuilder.respondOkWithHtml(context, msg);
101 void respondOk(DMaaPContext context, JSONObject json) throws IOException {
102 DMaaPResponseBuilder.respondOk(context, json);
105 boolean isCadiEnabled() {
106 return Utils.isCadiEnabled();
109 * @param dmaapContext
110 * @throws JSONException
111 * @throws ConfigDbException
112 * @throws IOException
116 public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
117 LOGGER.info("Fetching list of all the topics.");
118 JSONObject json = new JSONObject();
120 JSONArray topicsList = new JSONArray();
122 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
123 topicsList.put(topic.getName());
126 json.put("topics", topicsList);
128 LOGGER.info("Returning list of all the topics.");
129 respondOk(dmaapContext, json);
134 * @param dmaapContext
135 * @throws JSONException
136 * @throws ConfigDbException
137 * @throws IOException
140 public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
142 LOGGER.info("Fetching list of all the topics.");
143 JSONObject json = new JSONObject();
145 JSONArray topicsList = new JSONArray();
147 for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
148 JSONObject obj = new JSONObject();
149 obj.put("topicName", topic.getName());
151 obj.put("owner", topic.getOwner());
152 obj.put("txenabled", topic.isTransactionEnabled());
156 json.put("topics", topicsList);
158 LOGGER.info("Returning list of all the topics.");
159 respondOk(dmaapContext, json);
164 * @param dmaapContext
166 * @throws ConfigDbException
167 * @throws IOException
168 * @throws TopicExistsException
171 public void getTopic(DMaaPContext dmaapContext, String topicName)
172 throws ConfigDbException, IOException, TopicExistsException {
174 LOGGER.info("Fetching details of topic " + topicName);
175 Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
178 LOGGER.error("Topic [" + topicName + "] does not exist.");
179 throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
182 JSONObject o = new JSONObject();
183 o.put("name", t.getName());
184 o.put("description", t.getDescription());
186 if (null != t.getOwners())
187 o.put("owner", t.getOwners().iterator().next());
188 if (null != t.getReaderAcl())
189 o.put("readerAcl", aclToJson(t.getReaderAcl()));
190 if (null != t.getWriterAcl())
191 o.put("writerAcl", aclToJson(t.getWriterAcl()));
193 LOGGER.info("Returning details of topic " + topicName);
194 respondOk(dmaapContext, o);
199 * @param dmaapContext
201 * @throws CambriaApiException
202 * @throws AccessDeniedException
203 * @throws IOException
204 * @throws TopicExistsException
205 * @throws JSONException
211 public void createTopic(DMaaPContext dmaapContext, TopicBean topicBean) throws CambriaApiException, IOException {
212 String topicName = topicBean.getTopicName();
213 LOGGER.info("Creating topic {}",topicName);
214 String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
217 final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
218 final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
220 final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
221 key, partitions, replicas, topicBean.isTransactionEnabled());
223 LOGGER.info("Topic {} created successfully. Sending response", topicName);
224 respondOk(dmaapContext, topicToJson(t));
225 } catch (JSONException ex) {
227 LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
228 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
229 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
230 LOGGER.info(errRes.toString());
231 throw new CambriaApiException(errRes);
233 } catch (ConfigDbException ex) {
235 LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex);
236 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
237 DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
238 LOGGER.info(errRes.toString());
239 throw new CambriaApiException(errRes);
240 } catch (Broker1.TopicExistsException ex) {
241 LOGGER.error( "Failed to create topic "+ topicName +". Topic already exists.",ex);
245 private String authorizeClient(DMaaPContext dmaapContext, String topicName, String operation) throws DMaaPAccessDeniedException {
246 String clientId = Strings.EMPTY;
247 if(isCadiEnabled() && isTopicWithEnforcedAuthorization(topicName)) {
248 LOGGER.info("Performing AAF authorization for topic {} creation.", topicName);
249 String permission = buildPermission(topicName, operation);
250 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
251 clientId = getAAFclientId(dmaapContext.getRequest());
253 if (!aaf.aafAuthentication(dmaapContext.getRequest(), permission)) {
254 LOGGER.error("Failed to {} topic {}. Authorization failed for client {} and permission {}",
255 operation, topicName, clientId, permission);
256 throw new DMaaPAccessDeniedException(new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
257 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
258 "Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
260 } else if(operation.equals(TOPIC_CREATE_OP)){
261 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
262 clientId = (user != null) ? user.getKey() : Strings.EMPTY;
267 private String getAAFclientId(HttpServletRequest request) {
268 Principal principal = request.getUserPrincipal();
269 if (principal !=null) {
270 return principal.getName();
272 LOGGER.warn("Performing AAF authorization but user has not been provided in request.");
277 private boolean isTopicWithEnforcedAuthorization(String topicName) {
278 String enfTopicNamespace = getPropertyFromAJSCbean("enforced.topic.name.AAF");
279 return enfTopicNamespace != null && topicName.startsWith(enfTopicNamespace);
282 int getValueOrDefault(int value, String defaultProperty) {
283 int returnValue = value;
284 if (returnValue <= 0) {
285 String defaultValue = getPropertyFromAJSCmap(defaultProperty);
286 returnValue = StringUtils.isNotEmpty(defaultValue) ? NumberUtils.toInt(defaultValue) : 1;
287 returnValue = (returnValue <= 0) ? 1 : returnValue;
292 private String buildPermission(String topicName, String operation) {
293 String nameSpace = (topicName.indexOf('.') > 1) ?
294 topicName.substring(0, topicName.lastIndexOf('.')) : "";
296 String mrFactoryValue = getPropertyFromAJSCmap("msgRtr.topicfactory.aaf");
297 return mrFactoryValue + nameSpace + "|" + operation;
301 * @param dmaapContext
303 * @throws ConfigDbException
304 * @throws IOException
305 * @throws TopicExistsException
306 * @throws CambriaApiException
307 * @throws AccessDeniedException
310 public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
311 CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
313 LOGGER.info(" Deleting topic " + topicName);
314 authorizeClient(dmaapContext, topicName, "destroy");
316 final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
318 LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
319 throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
322 // metabroker.deleteTopic(topicName);
324 LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
325 respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
330 * @param dmaapContext
333 DMaaPKafkaMetaBroker getMetaBroker(DMaaPContext dmaapContext) {
334 return (DMaaPKafkaMetaBroker) dmaapContext.getConfigReader().getfMetaBroker();
338 * @param dmaapContext
340 * @throws ConfigDbException
341 * @throws IOException
342 * @throws TopicExistsException
346 public void getPublishersByTopicName(DMaaPContext dmaapContext, String topicName)
347 throws ConfigDbException, IOException, TopicExistsException {
348 LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
349 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
352 LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
353 throw new TopicExistsException(
354 "Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
357 final NsaAcl acl = topic.getWriterAcl();
359 LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
360 respondOk(dmaapContext, aclToJson(acl));
369 private static JSONObject aclToJson(NsaAcl acl) {
370 final JSONObject o = new JSONObject();
372 o.put("enabled", false);
373 o.put("users", new JSONArray());
375 o.put("enabled", acl.isActive());
377 final JSONArray a = new JSONArray();
378 for (String user : acl.getUsers()) {
387 * @param dmaapContext
391 public void getConsumersByTopicName(DMaaPContext dmaapContext, String topicName)
392 throws IOException, ConfigDbException, TopicExistsException {
393 LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
394 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
397 LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
398 throw new TopicExistsException(
399 "Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
402 final NsaAcl acl = topic.getReaderAcl();
404 LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
405 respondOk(dmaapContext, aclToJson(acl));
414 static JSONObject topicToJson(Topic t) {
415 final JSONObject o = new JSONObject();
417 o.put("name", t.getName());
418 o.put("description", t.getDescription());
419 o.put("owner", t.getOwner());
420 o.put("readerAcl", aclToJson(t.getReaderAcl()));
421 o.put("writerAcl", aclToJson(t.getWriterAcl()));
427 * @param dmaapContext
428 * @param topicName @param producerId @throws
429 * ConfigDbException @throws IOException @throws
430 * TopicExistsException @throws AccessDeniedException @throws
434 public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
435 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
437 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
438 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
442 // LOGGER.info("Authenticating the user, as ACL authentication is not
444 //// String permission =
451 // LOGGER.error("Failed to permit write access to producer [" +
452 // producerId + "] for topic " + topicName
454 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
455 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
456 // errorMessages.getNotPermitted1()+" <Grant publish permissions>
463 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
466 LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
467 + "] does not exist.");
468 throw new TopicExistsException("Failed to permit write access to producer [" + producerId
469 + "] for topic. Topic [" + topicName + "] does not exist.");
472 topic.permitWritesFromUser(producerId, user);
474 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
475 + "]. Sending response.");
476 respondOk(dmaapContext, "Write access has been granted to publisher.");
481 * @param dmaapContext
484 * @throws ConfigDbException
485 * @throws IOException
486 * @throws TopicExistsException
487 * @throws AccessDeniedException
488 * @throws DMaaPAccessDeniedException
492 public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
493 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
494 DMaaPAccessDeniedException {
496 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
497 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
500 //// String permission =
502 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
503 // String permission = aaf.aafPermissionString(topicName, "manage");
504 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
506 // LOGGER.error("Failed to revoke write access to producer [" +
507 // producerId + "] for topic " + topicName
509 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
510 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
511 // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
514 // throw new DMaaPAccessDeniedException(errRes);
519 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
522 LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
523 + "] does not exist.");
524 throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
525 + "] for topic. Topic [" + topicName + "] does not exist.");
528 topic.denyWritesFromUser(producerId, user);
530 LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
531 + "]. Sending response.");
532 respondOk(dmaapContext, "Write access has been revoked for publisher.");
537 * @param dmaapContext
540 * @throws DMaaPAccessDeniedException
543 public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
544 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
545 DMaaPAccessDeniedException {
547 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
548 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
551 //// String permission =
554 // String permission = aaf.aafPermissionString(topicName, "manage");
555 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
557 // LOGGER.error("Failed to permit read access to consumer [" +
558 // consumerId + "] for topic " + topicName
560 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
561 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
562 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
569 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
572 LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
573 + "] does not exist.");
574 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
575 + "] for topic. Topic [" + topicName + "] does not exist.");
578 topic.permitReadsByUser(consumerId, user);
580 LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
581 + "]. Sending response.");
582 respondOk(dmaapContext,
583 "Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
587 * @param dmaapContext
590 * @throws DMaaPAccessDeniedException
593 public void denyConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
594 throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
595 DMaaPAccessDeniedException {
597 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
598 final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
600 //// String permission =
603 // String permission = aaf.aafPermissionString(topicName, "manage");
604 // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
606 // LOGGER.error("Failed to revoke read access to consumer [" +
607 // consumerId + "] for topic " + topicName
609 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
610 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
611 // errorMessages.getNotPermitted1()+" <Grant consume permissions>
614 // throw new DMaaPAccessDeniedException(errRes);
619 Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
622 LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
623 + "] does not exist.");
624 throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
625 + "] for topic. Topic [" + topicName + "] does not exist.");
628 topic.denyReadsByUser(consumerId, user);
630 LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
631 + "]. Sending response.");
632 respondOk(dmaapContext,
633 "Read access has been revoked for consumer [" + consumerId + "] for topic [" + topicName + "].");