1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import javax.servlet.http.HttpServletRequest;
26 import javax.servlet.http.HttpServletResponse;
27 import javax.ws.rs.Consumes;
28 import javax.ws.rs.DELETE;
29 import javax.ws.rs.GET;
30 import javax.ws.rs.POST;
31 import javax.ws.rs.PUT;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.core.Context;
36 import javax.ws.rs.core.MediaType;
38 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
43 import org.json.JSONException;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.beans.factory.annotation.Qualifier;
46 import org.springframework.beans.factory.annotation.Value;
47 import org.springframework.stereotype.Component;
48 import org.onap.dmaap.dmf.mr.CambriaApiException;
49 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
50 import org.onap.dmaap.dmf.mr.beans.TopicBean;
51 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
52 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
53 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
54 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
55 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
56 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
57 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
58 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
59 import org.onap.dmaap.dmf.mr.service.TopicService;
60 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
61 import com.att.nsa.configs.ConfigDbException;
62 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
65 * This class is a CXF REST service which acts
66 * as gateway for MR Topic Service.
67 * @author Ramkumar Sembaiyan
73 public class TopicRestService {
78 //private static final Logger LOGGER = Logger .getLogger(TopicRestService.class);
79 private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(TopicRestService.class);
84 @Qualifier("configurationReader")
85 private ConfigurationReader configReader;
88 * HttpServletRequest obj
91 private HttpServletRequest request;
94 * HttpServletResponse obj
97 private HttpServletResponse response;
103 private TopicService topicService;
106 * DMaaPErrorMessages obj
109 private DMaaPErrorMessages errorMessages;
114 //@Value("${msgRtr.namespace.aaf}")
115 // private String mrNamespace;
119 * Fetches a list of topics from the current kafka instance and converted
122 * @return list of the topics in json format
123 * @throws AccessDeniedException
124 * @throws CambriaApiException
125 * @throws IOException
126 * @throws JSONException
129 //@Produces(MediaType.TEXT_PLAIN)
130 public void getTopics() throws CambriaApiException {
133 LOGGER.info("Authenticating the user before fetching the topics");
134 //String permission = "com.att.dmaap.mr.topic|*|view";
135 String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
136 String permission =mrNameS+"|"+"*"+"|"+"view";
137 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
138 //Check if client is using AAF CADI Basic Authorization
139 //If yes then check for AAF role authentication else display all topics
140 if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
142 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
145 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
146 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
147 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
148 LOGGER.info(errRes.toString());
149 throw new DMaaPAccessDeniedException(errRes);
155 LOGGER.info("Fetching all Topics");
156 //topicService = new com.att.dmf.mr.service.impl.TopicServiceImpl();
157 topicService.getTopics(getDmaapContext());
159 LOGGER.info("Returning List of all Topics");
162 } catch (JSONException | ConfigDbException | IOException excp) {
164 "Failed to retrieve list of all topics: "
165 + excp.getMessage(), excp);
167 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
168 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
169 errorMessages.getTopicsfailure()+ excp.getMessage());
170 LOGGER.info(errRes.toString());
171 throw new CambriaApiException(errRes);
178 * Fetches a list of topics from the current kafka instance and converted
181 * @return list of the topics in json format
182 * @throws AccessDeniedException
183 * @throws CambriaApiException
184 * @throws IOException
185 * @throws JSONException
189 //@Produces(MediaType.TEXT_PLAIN)
190 public void getAllTopics() throws CambriaApiException {
193 LOGGER.info("Authenticating the user before fetching the topics");
194 //String permission = "com.att.dmaap.mr.topic|*|view";
195 String mrNameS= com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
196 String permission =mrNameS+"|"+"*"+"|"+"view";
197 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
198 //Check if client is using AAF CADI Basic Authorization
199 //If yes then check for AAF role authentication else display all topics
200 if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
202 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
205 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
206 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
207 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
208 LOGGER.info(errRes.toString());
209 throw new DMaaPAccessDeniedException(errRes);
215 LOGGER.info("Fetching all Topics");
217 topicService.getAllTopics(getDmaapContext());
219 LOGGER.info("Returning List of all Topics");
222 } catch (JSONException | ConfigDbException | IOException excp) {
224 "Failed to retrieve list of all topics: "
225 + excp.getMessage(), excp);
227 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
228 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
229 errorMessages.getTopicsfailure()+ excp.getMessage());
230 LOGGER.info(errRes.toString());
231 throw new CambriaApiException(errRes);
239 * Returns details of the topic whose name is passed as a parameter
242 * - name of the topic
243 * @return details of a topic whose name is mentioned in the request in json
245 * @throws AccessDeniedException
246 * @throws DMaaPAccessDeniedException
247 * @throws IOException
250 @Path("/{topicName}")
251 //@Produces(MediaType.TEXT_PLAIN)
252 public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
255 LOGGER.info("Authenticating the user before fetching the details about topic = "+ topicName);
256 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
258 //String permission= "com.att.ecomp_test.crm.mr.topic|:topic.com.att.ecomp_test.crm.preDemo|view";
260 //Check if client is using AAF CADI Basic Authorization
261 //If yes then check for AAF role authentication else display all topics
262 if(null!=getDmaapContext().getRequest().getHeader("Authorization"))
264 String permission = aaf.aafPermissionString(topicName, "view");
265 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
268 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
269 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
270 errorMessages.getNotPermitted1()+" read "+errorMessages.getNotPermitted2());
271 LOGGER.info(errRes.toString());
272 throw new DMaaPAccessDeniedException(errRes);
276 LOGGER.info("Fetching Topic: " + topicName);
278 topicService.getTopic(getDmaapContext(), topicName);
280 LOGGER.info("Fetched details of topic: " + topicName);
282 } catch (ConfigDbException | IOException | TopicExistsException excp) {
283 LOGGER.error("Failed to retrieve details of topic: " + topicName,
286 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
287 DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
288 errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
289 LOGGER.info(errRes.toString());
290 throw new CambriaApiException(errRes);
299 * This method is still not working. Need to check on post call and how to
300 * accept parameters for post call
303 * it will have the bean object
304 * @throws TopicExistsException
305 * @throws CambriaApiException
306 * @throws JSONException
307 * @throws IOException
308 * @throws AccessDeniedException
313 @Consumes({ MediaType.APPLICATION_JSON })
314 //@Produces(MediaType.TEXT_PLAIN)
315 public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
317 LOGGER.info("Creating Topic."+topicBean.getTopicName());
319 topicService.createTopic(getDmaapContext(), topicBean);
321 LOGGER.info("Topic created Successfully.");
323 catch (TopicExistsException ex){
325 LOGGER.error("Error while creating a topic: " + ex.getMessage(),
328 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
329 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
330 errorMessages.getCreateTopicFail()+ ex.getMessage());
331 LOGGER.info(errRes.toString());
332 throw new CambriaApiException(errRes);
337 }catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
338 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
341 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
342 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
343 errorMessages.getCreateTopicFail()+ excp.getMessage());
344 LOGGER.info(errRes.toString());
345 throw new CambriaApiException(errRes);
347 }catch (CambriaApiException | IOException excp) {
348 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
351 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
352 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
353 errorMessages.getCreateTopicFail()+ excp.getMessage());
354 LOGGER.info(errRes.toString());
355 throw new CambriaApiException(errRes);
361 * Deletes existing topic whose name is passed as a parameter
365 * @throws CambriaApiException
366 * @throws IOException
369 @Path("/{topicName}")
370 //@Produces(MediaType.TEXT_PLAIN)
371 public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
373 LOGGER.info("Deleting Topic: " + topicName);
375 topicService.deleteTopic(getDmaapContext(), topicName);
377 LOGGER.info("Topic [" + topicName + "] deleted successfully.");
378 } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
379 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
382 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
383 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
384 errorMessages.getCreateTopicFail()+ excp.getMessage());
385 LOGGER.info(errRes.toString());
386 throw new CambriaApiException(errRes);
388 }catch (IOException | ConfigDbException
389 | CambriaApiException | TopicExistsException excp) {
390 LOGGER.error("Error while deleting topic: " + topicName, excp);
392 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
393 DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
394 errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
395 LOGGER.info(errRes.toString());
396 throw new CambriaApiException(errRes);
401 private DMaaPContext getDmaapContext() {
403 DMaaPContext dmaapContext = new DMaaPContext();
404 dmaapContext.setRequest(request);
405 dmaapContext.setResponse(response);
406 dmaapContext.setConfigReader(configReader);
413 * This method will fetch the details of publisher by giving topic name
416 * @throws CambriaApiException
417 * @throws AccessDeniedException
420 @Path("/{topicName}/producers")
421 //@Produces(MediaType.TEXT_PLAIN)
422 public void getPublishersByTopicName(
423 @PathParam("topicName") String topicName) throws CambriaApiException {
426 // String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"manage";
427 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
428 // String permission = aaf.aafPermissionString(topicName, "view");
429 // if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
431 LOGGER.info("Fetching list of all the publishers for topic "
434 topicService.getPublishersByTopicName(getDmaapContext(), topicName);
436 LOGGER.info("Returning list of all the publishers for topic "
439 // LOGGER.error("Error while fetching list of publishers for topic "+ topicName);
441 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
442 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
443 // errorMessages.getNotPermitted1()+" fetch list of publishers "+errorMessages.getNotPermitted2());
444 // LOGGER.info(errRes);
445 // throw new DMaaPAccessDeniedException(errRes);
449 } catch (IOException | ConfigDbException | TopicExistsException excp) {
450 LOGGER.error("Error while fetching list of publishers for topic "
452 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
453 DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
454 "Error while fetching list of publishers for topic: "
455 + topicName + excp.getMessage());
456 LOGGER.info(errRes.toString());
457 throw new CambriaApiException(errRes);
463 * proving permission for the topic for a particular publisher id
467 * @throws CambriaApiException
470 @Path("/{topicName}/producers/{producerId}")
471 public void permitPublisherForTopic(
472 @PathParam("topicName") String topicName,
473 @PathParam("producerId") String producerId) throws CambriaApiException {
475 LOGGER.info("Granting write access to producer [" + producerId
476 + "] for topic " + topicName);
478 topicService.permitPublisherForTopic(getDmaapContext(), topicName,
481 LOGGER.info("Write access has been granted to producer ["
482 + producerId + "] for topic " + topicName);
483 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
484 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
487 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
488 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
489 errorMessages.getCreateTopicFail()+ excp.getMessage());
490 LOGGER.info(errRes.toString());
491 throw new CambriaApiException(errRes);
493 }catch ( ConfigDbException | IOException
494 | TopicExistsException excp) {
495 LOGGER.error("Error while granting write access to producer ["
496 + producerId + "] for topic " + topicName, excp);
498 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
499 DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
500 "Error while granting write access to producer ["
501 + producerId + "] for topic " + topicName + excp.getMessage());
502 LOGGER.info(errRes.toString());
503 throw new CambriaApiException(errRes);
509 * Removing access for a publisher id for any particular topic
513 * @throws CambriaApiException
516 @Path("/{topicName}/producers/{producerId}")
517 public void denyPublisherForTopic(@PathParam("topicName") String topicName,
518 @PathParam("producerId") String producerId) throws CambriaApiException {
520 LOGGER.info("Revoking write access to producer [" + producerId
521 + "] for topic " + topicName);
523 topicService.denyPublisherForTopic(getDmaapContext(), topicName,
526 LOGGER.info("Write access revoked for producer [" + producerId
527 + "] for topic " + topicName);
528 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
529 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
532 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
533 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
534 errorMessages.getCreateTopicFail()+ excp.getMessage());
535 LOGGER.info(errRes.toString());
536 throw new CambriaApiException(errRes);
538 }catch ( ConfigDbException | IOException
539 | TopicExistsException excp) {
540 LOGGER.error("Error while revoking write access for producer ["
541 + producerId + "] for topic " + topicName, excp);
542 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
543 DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
544 "Error while revoking write access to producer ["
545 + producerId + "] for topic " + topicName + excp.getMessage());
546 LOGGER.info(errRes.toString());
547 throw new CambriaApiException(errRes);
552 * Get the consumer details by the topic name
555 * @throws AccessDeniedException
556 * @throws CambriaApiException
559 @Path("/{topicName}/consumers")
560 //@Produces(MediaType.TEXT_PLAIN)
561 public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws AccessDeniedException,
562 CambriaApiException {
566 // String permission = "com.att.dmaap.mr.topic"+"|"+topicName+"|"+"view";
567 // DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
568 // String permission = aaf.aafPermissionString(topicName, "view");
569 // if(aaf.aafAuthentication(getDmaapContext().getRequest(), permission))
571 LOGGER.info("Fetching list of all consumers for topic " + topicName);
573 topicService.getConsumersByTopicName(getDmaapContext(), topicName);
575 LOGGER.info("Returning list of all consumers for topic "
580 // "Error while fetching list of all consumers for topic "
582 // ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
583 // DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
584 // errorMessages.getNotPermitted1()+" fetch list of consumers "+errorMessages.getNotPermitted2());
585 // LOGGER.info(errRes);
586 // throw new DMaaPAccessDeniedException(errRes);
593 } catch (IOException | ConfigDbException | TopicExistsException excp) {
595 "Error while fetching list of all consumers for topic "
597 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
598 DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
599 "Error while fetching list of all consumers for topic: "
600 + topicName+ excp.getMessage());
601 LOGGER.info(errRes.toString());
602 throw new CambriaApiException(errRes);
608 * providing access for consumer for any particular topic
612 * @throws CambriaApiException
615 @Path("/{topicName}/consumers/{consumerId}")
616 public void permitConsumerForTopic(
617 @PathParam("topicName") String topicName,
618 @PathParam("consumerId") String consumerId) throws CambriaApiException {
620 LOGGER.info("Granting read access to consumer [" + consumerId
621 + "] for topic " + topicName);
623 topicService.permitConsumerForTopic(getDmaapContext(), topicName,
626 LOGGER.info("Read access granted to consumer [" + consumerId
627 + "] for topic " + topicName);
628 } catch (AccessDeniedException | ConfigDbException | IOException
629 | TopicExistsException excp) {
630 LOGGER.error("Error while granting read access to consumer ["
631 + consumerId + "] for topic " + topicName, excp);
632 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
633 DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
634 "Error while granting read access to consumer ["
635 + consumerId + "] for topic " + topicName+ excp.getMessage());
636 LOGGER.info(errRes.toString());
637 throw new CambriaApiException(errRes);
643 * Removing access for consumer for any particular topic
647 * @throws CambriaApiException
650 @Path("/{topicName}/consumers/{consumerId}")
651 public void denyConsumerForTopic(@PathParam("topicName") String topicName,
652 @PathParam("consumerId") String consumerId) throws CambriaApiException {
654 LOGGER.info("Revoking read access to consumer [" + consumerId
655 + "] for topic " + topicName);
657 topicService.denyConsumerForTopic(getDmaapContext(), topicName,
660 LOGGER.info("Read access revoked to consumer [" + consumerId
661 + "] for topic " + topicName);
662 } catch ( ConfigDbException | IOException
663 | TopicExistsException excp) {
664 LOGGER.error("Error while revoking read access to consumer ["
665 + consumerId + "] for topic " + topicName, excp);
666 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
667 DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
668 "Error while revoking read access to consumer ["
669 + consumerId + "] for topic " + topicName+ excp.getMessage());
670 LOGGER.info(errRes.toString());
671 throw new CambriaApiException(errRes);
672 }catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
673 LOGGER.error("Error while creating a topic: " + excp.getMessage(),
676 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
677 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
678 errorMessages.getCreateTopicFail()+ excp.getMessage());
679 LOGGER.info(errRes.toString());
680 throw new CambriaApiException(errRes);
685 public TopicService getTopicService() {
689 public void setTopicService(TopicService topicService) {
690 this.topicService = topicService;