1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import javax.ws.rs.Consumes;
28 import javax.ws.rs.DELETE;
29 import javax.ws.rs.GET;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.PUT;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import javax.ws.rs.core.MediaType;
38 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
43 import org.json.JSONException;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.beans.factory.annotation.Qualifier;
46 import org.springframework.beans.factory.annotation.Value;
47 import org.springframework.stereotype.Component;
48 import com.att.nsa.cambria.CambriaApiException;
49 import com.att.nsa.cambria.beans.DMaaPContext;
50 import com.att.nsa.cambria.beans.TopicBean;
51 import com.att.nsa.cambria.constants.CambriaConstants;
52 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
53 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
54 import com.att.nsa.cambria.exception.DMaaPResponseCode;
55 import com.att.nsa.cambria.exception.ErrorResponse;
56 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
57 import com.att.nsa.cambria.security.DMaaPAAFAuthenticator;
58 import com.att.nsa.cambria.security.DMaaPAAFAuthenticatorImpl;
59 import com.att.nsa.cambria.service.TopicService;
60 import com.att.nsa.cambria.utils.ConfigurationReader;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
65 * This class is a CXF REST service which acts as gateway for MR Topic Service.
73 public class TopicRestService {
78 // private static final Logger LOGGER = Logger
79 // .getLogger(TopicRestService.class);
80 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
85 @Qualifier("configurationReader")
86 private ConfigurationReader configReader;
89 * HttpServletRequest obj
92 private HttpServletRequest request;
95 * HttpServletResponse obj
98 private HttpServletResponse response;
104 private TopicService tService;
107 * DMaaPErrorMessages obj
110 private DMaaPErrorMessages errorMessages;
112 private DMaaPContext dmaapContext = new DMaaPContext();
117 // @Value("${msgRtr.namespace.aaf}")
118 // private String mrNamespace;
121 * Fetches a list of topics from the current kafka instance and converted
124 * @return list of the topics in json format
125 * @throws AccessDeniedException
126 * @throws CambriaApiException
127 * @throws IOException
128 * @throws JSONException
131 // @Produces(MediaType.TEXT_PLAIN)
132 public void getTopics() throws CambriaApiException {
135 LOGGER.info("Authenticating the user before fetching the topics");
136 // String permission = "com.att.dmaap.mr.topic|*|view";
137 String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
138 "msgRtr.namespace.aaf");
139 String permission = mrNameS + "|" + "*" + "|" + "view";
140 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
141 // Check if client is using AAF CADI Basic Authorization
142 // If yes then check for AAF role authentication else display all
144 if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
145 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
147 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
148 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
149 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
150 LOGGER.info(errRes.toString());
151 throw new DMaaPAccessDeniedException(errRes);
156 LOGGER.info("Fetching all Topics");
158 tService.getTopics(getDmaapContext());
160 LOGGER.info("Returning List of all Topics");
162 } catch (JSONException | ConfigDbException | IOException excp) {
163 LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
165 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
166 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
167 errorMessages.getTopicsfailure() + excp.getMessage());
168 LOGGER.info(errRes.toString());
169 throw new CambriaApiException(errRes);
175 * Fetches a list of topics from the current kafka instance and converted
178 * @return list of the topics in json format
179 * @throws AccessDeniedException
180 * @throws CambriaApiException
181 * @throws IOException
182 * @throws JSONException
186 // @Produces(MediaType.TEXT_PLAIN)
187 public void getAllTopics() throws CambriaApiException {
190 LOGGER.info("Authenticating the user before fetching the topics");
191 // String permission = "com.att.dmaap.mr.topic|*|view";
192 String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,
193 "msgRtr.namespace.aaf");
194 String permission = mrNameS + "|" + "*" + "|" + "view";
195 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
196 // Check if client is using AAF CADI Basic Authorization
197 // If yes then check for AAF role authentication else display all
199 if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
200 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
202 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
203 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
204 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
205 LOGGER.info(errRes.toString());
206 throw new DMaaPAccessDeniedException(errRes);
211 LOGGER.info("Fetching all Topics");
213 tService.getAllTopics(getDmaapContext());
215 LOGGER.info("Returning List of all Topics");
217 } catch (JSONException | ConfigDbException | IOException excp) {
218 LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
220 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
221 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
222 errorMessages.getTopicsfailure() + excp.getMessage());
223 LOGGER.info(errRes.toString());
224 throw new CambriaApiException(errRes);
230 * Returns details of the topic whose name is passed as a parameter
233 * - name of the topic
234 * @return details of a topic whose name is mentioned in the request in json
236 * @throws AccessDeniedException
237 * @throws DMaaPAccessDeniedException
238 * @throws IOException
241 @Path("/{topicName}")
242 // @Produces(MediaType.TEXT_PLAIN)
243 public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
246 LOGGER.info("Authenticating the user before fetching the details about topic = " + topicName);
247 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
249 // String permission=
250 // "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
252 // Check if client is using AAF CADI Basic Authorization
253 // If yes then check for AAF role authentication else display all
255 if (null != getDmaapContext().getRequest().getHeader("Authorization")) {
256 String permission = aaf.aafPermissionString(topicName, "view");
257 if (!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
259 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
260 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
261 errorMessages.getNotPermitted1() + " read " + errorMessages.getNotPermitted2());
262 LOGGER.info(errRes.toString());
263 throw new DMaaPAccessDeniedException(errRes);
267 LOGGER.info("Fetching Topic: " + topicName);
269 tService.getTopic(getDmaapContext(), topicName);
271 LOGGER.info("Fetched details of topic: " + topicName);
273 } catch (ConfigDbException | IOException | TopicExistsException excp) {
274 LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
276 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
277 DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
278 errorMessages.getTopicDetailsFail() + topicName + excp.getMessage());
279 LOGGER.info(errRes.toString());
280 throw new CambriaApiException(errRes);
286 * This method is still not working. Need to check on post call and how to
287 * accept parameters for post call
290 * it will have the bean object
291 * @throws TopicExistsException
292 * @throws CambriaApiException
293 * @throws JSONException
294 * @throws IOException
295 * @throws AccessDeniedException
300 @Consumes({ MediaType.APPLICATION_JSON })
301 // @Produces(MediaType.TEXT_PLAIN)
302 public void createTopic(TopicBean topicBean) throws CambriaApiException{
304 LOGGER.info("Creating Topic." + topicBean.getTopicName());
306 tService.createTopic(getDmaapContext(), topicBean);
308 LOGGER.info("Topic created Successfully.");
309 } catch (TopicExistsException ex) {
311 LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
313 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
314 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
315 errorMessages.getCreateTopicFail() + ex.getMessage());
316 LOGGER.info(errRes.toString());
317 throw new CambriaApiException(errRes);
319 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
320 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
322 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
323 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
324 errorMessages.getCreateTopicFail() + excp.getMessage());
325 LOGGER.info(errRes.toString());
326 throw new CambriaApiException(errRes);
328 } catch (CambriaApiException | IOException excp) {
329 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
331 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
332 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
333 errorMessages.getCreateTopicFail() + excp.getMessage());
334 LOGGER.info(errRes.toString());
335 throw new CambriaApiException(errRes);
341 * Deletes existing topic whose name is passed as a parameter
345 * @throws CambriaApiException
346 * @throws IOException
349 @Path("/{topicName}")
350 // @Produces(MediaType.TEXT_PLAIN)
351 public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
353 LOGGER.info("Deleting Topic: " + topicName);
355 tService.deleteTopic(getDmaapContext(), topicName);
357 LOGGER.info("Topic [" + topicName + "] deleted successfully.");
358 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
359 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
361 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
362 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
363 errorMessages.getCreateTopicFail() + excp.getMessage());
364 LOGGER.info(errRes.toString());
365 throw new CambriaApiException(errRes);
367 } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
368 LOGGER.error("Error while deleting topic: " + topicName, excp);
370 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
371 DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
372 errorMessages.getDeleteTopicFail() + topicName + excp.getMessage());
373 LOGGER.info(errRes.toString());
374 throw new CambriaApiException(errRes);
379 private DMaaPContext getDmaapContext() {
381 dmaapContext.setRequest(request);
382 dmaapContext.setResponse(response);
383 dmaapContext.setConfigReader(configReader);
390 * This method will fetch the details of publisher by giving topic name
393 * @throws CambriaApiException
394 * @throws AccessDeniedException
397 @Path("/{topicName}/producers")
398 // @Produces(MediaType.TEXT_PLAIN)
399 public void getPublishersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
402 // String permission =
403 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
404 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
405 // String permission = aaf.aafPermissionString(topicName, "view");
406 // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
409 LOGGER.info("Fetching list of all the publishers for topic " + topicName);
411 tService.getPublishersByTopicName(getDmaapContext(), topicName);
413 LOGGER.info("Returning list of all the publishers for topic " + topicName);
415 // LOGGER.error("Error while fetching list of publishers for topic
418 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
419 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
420 // errorMessages.getNotPermitted1()+" fetch list of publishers
421 // "+errorMessages.getNotPermitted2());
422 // LOGGER.info(errRes);
423 // throw new DMaaPAccessDeniedException(errRes);
427 } catch (IOException | ConfigDbException | TopicExistsException excp) {
428 LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
429 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
430 DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
431 "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
432 LOGGER.info(errRes.toString());
433 throw new CambriaApiException(errRes);
439 * proving permission for the topic for a particular publisher id
443 * @throws CambriaApiException
446 @Path("/{topicName}/producers/{producerId}")
447 public void permitPublisherForTopic(@PathParam("topicName") String topicName,
448 @PathParam("producerId") String producerId) throws CambriaApiException {
450 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
452 tService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
454 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
455 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
456 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
458 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
459 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
460 errorMessages.getCreateTopicFail() + excp.getMessage());
461 LOGGER.info(errRes.toString());
462 throw new CambriaApiException(errRes);
464 } catch (ConfigDbException | IOException | TopicExistsException excp) {
465 LOGGER.error("Error while granting write access to producer [" + producerId + "] for topic " + topicName,
468 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
469 DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
470 "Error while granting write access to producer [" + producerId + "] for topic " + topicName
471 + excp.getMessage());
472 LOGGER.info(errRes.toString());
473 throw new CambriaApiException(errRes);
479 * Removing access for a publisher id for any particular topic
483 * @throws CambriaApiException
486 @Path("/{topicName}/producers/{producerId}")
487 public void denyPublisherForTopic(@PathParam("topicName") String topicName,
488 @PathParam("producerId") String producerId) throws CambriaApiException {
490 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
492 tService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
494 LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
495 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
496 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
498 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
499 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
500 errorMessages.getCreateTopicFail() + excp.getMessage());
501 LOGGER.info(errRes.toString());
502 throw new CambriaApiException(errRes);
504 } catch (ConfigDbException | IOException | TopicExistsException excp) {
505 LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName,
507 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
508 DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
509 "Error while revoking write access to producer [" + producerId + "] for topic " + topicName
510 + excp.getMessage());
511 LOGGER.info(errRes.toString());
512 throw new CambriaApiException(errRes);
517 * Get the consumer details by the topic name
520 * @throws AccessDeniedException
521 * @throws CambriaApiException
524 @Path("/{topicName}/consumers")
525 // @Produces(MediaType.TEXT_PLAIN)
526 public void getConsumersByTopicName(@PathParam("topicName") String topicName)
527 throws AccessDeniedException, CambriaApiException {
530 // String permission =
531 // "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
532 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
533 // String permission = aaf.aafPermissionString(topicName, "view");
534 // if(aaf.aafAuthentication(getDmaapContext().getRequest(),
537 LOGGER.info("Fetching list of all consumers for topic " + topicName);
539 tService.getConsumersByTopicName(getDmaapContext(), topicName);
541 LOGGER.info("Returning list of all consumers for topic " + topicName);
545 // "Error while fetching list of all consumers for topic "
547 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
548 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
549 // errorMessages.getNotPermitted1()+" fetch list of consumers
550 // "+errorMessages.getNotPermitted2());
551 // LOGGER.info(errRes);
552 // throw new DMaaPAccessDeniedException(errRes);
557 } catch (IOException | ConfigDbException | TopicExistsException excp) {
558 LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
559 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
560 DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
561 "Error while fetching list of all consumers for topic: " + topicName + excp.getMessage());
562 LOGGER.info(errRes.toString());
563 throw new CambriaApiException(errRes);
569 * providing access for consumer for any particular topic
573 * @throws CambriaApiException
576 @Path("/{topicName}/consumers/{consumerId}")
577 public void permitConsumerForTopic(@PathParam("topicName") String topicName,
578 @PathParam("consumerId") String consumerId) throws CambriaApiException {
580 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
582 tService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
584 LOGGER.info("Read access granted to consumer [" + consumerId + "] for topic " + topicName);
585 } catch (AccessDeniedException | ConfigDbException | IOException | TopicExistsException excp) {
586 LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName,
588 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
589 DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
590 "Error while granting read access to consumer [" + consumerId + "] for topic " + topicName
591 + excp.getMessage());
592 LOGGER.info(errRes.toString());
593 throw new CambriaApiException(errRes);
599 * Removing access for consumer for any particular topic
603 * @throws CambriaApiException
606 @Path("/{topicName}/consumers/{consumerId}")
607 public void denyConsumerForTopic(@PathParam("topicName") String topicName,
608 @PathParam("consumerId") String consumerId) throws CambriaApiException {
610 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
612 tService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
614 LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
615 } catch (ConfigDbException | IOException | TopicExistsException excp) {
616 LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName,
618 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
619 DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
620 "Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName
621 + excp.getMessage());
622 LOGGER.info(errRes.toString());
623 throw new CambriaApiException(errRes);
624 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
625 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
627 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
628 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
629 errorMessages.getCreateTopicFail() + excp.getMessage());
630 LOGGER.info(errRes.toString());
631 throw new CambriaApiException(errRes);