*/
package org.onap.dmaap.dmf.mr.service.impl;
+import static org.onap.dmaap.util.DMaaPAuthFilter.isUseCustomAcls;
+
import com.att.ajsc.beans.PropertiesMapBean;
import com.att.ajsc.filemonitor.AJSCPropertiesMap;
import com.att.eelf.configuration.EELFLogger;
public class TopicServiceImpl implements TopicService {
private static final String TOPIC_CREATE_OP = "create";
- private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicServiceImpl.class);
+ private static final EELFLogger LOGGER = EELFManager.getLogger(TopicServiceImpl.class);
@Autowired
private DMaaPErrorMessages errorMessages;
public void getTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
LOGGER.info("Fetching list of all the topics.");
JSONObject json = new JSONObject();
-
JSONArray topicsList = new JSONArray();
-
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
topicsList.put(topic.getName());
}
-
json.put("topics", topicsList);
-
LOGGER.info("Returning list of all the topics.");
respondOk(dmaapContext, json);
-
}
/**
*
*/
public void getAllTopics(DMaaPContext dmaapContext) throws JSONException, ConfigDbException, IOException {
-
LOGGER.info("Fetching list of all the topics.");
JSONObject json = new JSONObject();
-
JSONArray topicsList = new JSONArray();
-
for (Topic topic : getMetaBroker(dmaapContext).getAllTopics()) {
JSONObject obj = new JSONObject();
obj.put("topicName", topic.getName());
obj.put("txenabled", topic.isTransactionEnabled());
topicsList.put(obj);
}
-
json.put("topics", topicsList);
-
LOGGER.info("Returning list of all the topics.");
respondOk(dmaapContext, json);
-
}
/**
@Override
public void getTopic(DMaaPContext dmaapContext, String topicName)
throws ConfigDbException, IOException, TopicExistsException {
-
LOGGER.info("Fetching details of topic " + topicName);
Topic t = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == t) {
LOGGER.error("Topic [" + topicName + "] does not exist.");
throw new TopicExistsException("Topic [" + topicName + "] does not exist.");
}
-
JSONObject o = new JSONObject();
o.put("name", t.getName());
o.put("description", t.getDescription());
-
if (null != t.getOwners())
o.put("owner", t.getOwners().iterator().next());
if (null != t.getReaderAcl())
o.put("readerAcl", aclToJson(t.getReaderAcl()));
if (null != t.getWriterAcl())
o.put("writerAcl", aclToJson(t.getWriterAcl()));
-
LOGGER.info("Returning details of topic " + topicName);
respondOk(dmaapContext, o);
-
}
/**
String topicName = topicBean.getTopicName();
LOGGER.info("Creating topic {}",topicName);
String key = authorizeClient(dmaapContext, topicName, TOPIC_CREATE_OP);
-
try {
final int partitions = getValueOrDefault(topicBean.getPartitionCount(), "default.partitions");
final int replicas = getValueOrDefault(topicBean.getReplicationCount(), "default.replicas");
-
final Topic t = getMetaBroker(dmaapContext).createTopic(topicName, topicBean.getTopicDescription(),
key, partitions, replicas, topicBean.isTransactionEnabled());
-
LOGGER.info("Topic {} created successfully. Sending response", topicName);
respondOk(dmaapContext, topicToJson(t));
} catch (JSONException ex) {
-
LOGGER.error("Failed to create topic "+ topicName +". Couldn't parse JSON data.", ex);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
LOGGER.info(errRes.toString());
throw new CambriaApiException(errRes);
-
} catch (ConfigDbException ex) {
-
LOGGER.error("Failed to create topic "+ topicName +". Config DB Exception", ex);
ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_BAD_REQUEST,
DMaaPResponseCode.INCORRECT_JSON.getResponseCode(), errorMessages.getIncorrectJson());
DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
"Failed to "+ operation +" topic: Access Denied. User does not have permission to create topic with perm " + permission));
}
- } else if(operation.equals(TOPIC_CREATE_OP)){
+ } else if (operation.equals(TOPIC_CREATE_OP)){
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
clientId = (user != null) ? user.getKey() : Strings.EMPTY;
}
@Override
public void deleteTopic(DMaaPContext dmaapContext, String topicName) throws IOException, ConfigDbException,
CambriaApiException, TopicExistsException, DMaaPAccessDeniedException, AccessDeniedException {
-
LOGGER.info(" Deleting topic " + topicName);
authorizeClient(dmaapContext, topicName, "destroy");
-
final Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
if (topic == null) {
LOGGER.error("Failed to delete topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException("Failed to delete topic. Topic [" + topicName + "] does not exist.");
}
-
// metabroker.deleteTopic(topicName);
-
LOGGER.info("Topic [" + topicName + "] deleted successfully. Sending response.");
respondOk(dmaapContext, "Topic [" + topicName + "] deleted successfully");
}
throws ConfigDbException, IOException, TopicExistsException {
LOGGER.info("Retrieving list of all the publishers for topic " + topicName);
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (topic == null) {
LOGGER.error("Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException(
"Failed to retrieve publishers list for topic. Topic [" + topicName + "] does not exist.");
}
-
final NsaAcl acl = topic.getWriterAcl();
-
LOGGER.info("Returning list of all the publishers for topic " + topicName + ". Sending response.");
respondOk(dmaapContext, aclToJson(acl));
-
}
/**
throws IOException, ConfigDbException, TopicExistsException {
LOGGER.info("Retrieving list of all the consumers for topic " + topicName);
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (topic == null) {
LOGGER.error("Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
throw new TopicExistsException(
"Failed to retrieve consumers list for topic. Topic [" + topicName + "] does not exist.");
}
-
final NsaAcl acl = topic.getReaderAcl();
-
LOGGER.info("Returning list of all the consumers for topic " + topicName + ". Sending response.");
respondOk(dmaapContext, aclToJson(acl));
*/
@Override
public void permitPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException, CambriaApiException {
-
+ throws AccessDeniedException, ConfigDbException, TopicExistsException {
LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
-
- //
- // LOGGER.info("Authenticating the user, as ACL authentication is not
-
- //// String permission =
-
- //
-
-
-
- // {
- // LOGGER.error("Failed to permit write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant publish permissions>
-
-
-
- // }
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to permit write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit write access to producer [" + producerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
- topic.permitWritesFromUser(producerId, user);
-
- LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
+ if (isUseCustomAcls()) {
+ topic.permitWritesFromUser(producerId, user);
+ LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic [" + topicName
+ "]. Sending response.");
+ } else {
+ LOGGER.info("Ignoring acl update");
+ }
respondOk(dmaapContext, "Write access has been granted to publisher.");
-
}
/**
public void denyPublisherForTopic(DMaaPContext dmaapContext, String topicName, String producerId)
throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
DMaaPAccessDeniedException {
-
LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
- // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke write access to producer [" +
- // producerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Revoke publish permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- //
-
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to revoke write access to producer [" + producerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to revoke write access to producer [" + producerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
topic.denyWritesFromUser(producerId, user);
-
LOGGER.info("Write access has been revoked to producer [" + producerId + "] for topic [" + topicName
+ "]. Sending response.");
respondOk(dmaapContext, "Write access has been revoked for publisher.");
-
}
/**
*/
@Override
public void permitConsumerForTopic(DMaaPContext dmaapContext, String topicName, String consumerId)
- throws AccessDeniedException, ConfigDbException, IOException, TopicExistsException,
- DMaaPAccessDeniedException {
+ throws AccessDeniedException, ConfigDbException, TopicExistsException {
LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to permit read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
-
- // }
- // }
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to permit read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
- topic.permitReadsByUser(consumerId, user);
-
- LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
+ if (isUseCustomAcls()) {
+ topic.permitReadsByUser(consumerId, user);
+ LOGGER.info("Read access has been granted to consumer [" + consumerId + "] for topic [" + topicName
+ "]. Sending response.");
+ } else {
+ LOGGER.info("Ignoring acl update");
+ }
respondOk(dmaapContext,
"Read access has been granted for consumer [" + consumerId + "] for topic [" + topicName + "].");
}
LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
final NsaApiKey user = getDmaapAuthenticatedUser(dmaapContext);
-
- //// String permission =
-
-
- // String permission = aaf.aafPermissionString(topicName, "manage");
- // if(!aaf.aafAuthentication(dmaapContext.getRequest(), permission))
- // {
- // LOGGER.error("Failed to revoke read access to consumer [" +
- // consumerId + "] for topic " + topicName
-
- // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
- // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
- // errorMessages.getNotPermitted1()+" <Grant consume permissions>
-
-
- // throw new DMaaPAccessDeniedException(errRes);
- // }
- //
- //
-
Topic topic = getMetaBroker(dmaapContext).getTopic(topicName);
-
if (null == topic) {
LOGGER.error("Failed to revoke read access to consumer [" + consumerId + "] for topic. Topic [" + topicName
+ "] does not exist.");
throw new TopicExistsException("Failed to permit read access to consumer [" + consumerId
+ "] for topic. Topic [" + topicName + "] does not exist.");
}
-
topic.denyReadsByUser(consumerId, user);
-
LOGGER.info("Read access has been revoked to consumer [" + consumerId + "] for topic [" + topicName
+ "]. Sending response.");
respondOk(dmaapContext,