[DMAAP-MR] Remove acl check on topics
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / TopicRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *
21  *******************************************************************************/
22 package org.onap.dmaap.service;
23
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDbException;
27 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
28 import java.io.IOException;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.DELETE;
33 import javax.ws.rs.GET;
34 import javax.ws.rs.POST;
35 import javax.ws.rs.PUT;
36 import javax.ws.rs.Path;
37 import javax.ws.rs.PathParam;
38 import javax.ws.rs.core.Context;
39 import javax.ws.rs.core.MediaType;
40 import org.apache.http.HttpStatus;
41 import org.json.JSONException;
42 import org.onap.dmaap.dmf.mr.CambriaApiException;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
44 import org.onap.dmaap.dmf.mr.beans.TopicBean;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
49 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.service.TopicService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import org.springframework.beans.factory.annotation.Autowired;
57 import org.springframework.beans.factory.annotation.Qualifier;
58 import org.springframework.stereotype.Component;
59
60 /**
61  * This class is a CXF REST service which acts 
62  * as gateway for MR Topic Service.
63  * @author Ramkumar Sembaiyan
64  *
65  */
66
67 @Component
68 @Path("/")
69 public class TopicRestService {
70
71         /**
72          * Logger obj
73          */
74         private static final EELFLogger LOGGER = EELFManager.getLogger(TopicRestService.class);
75         private static final String READ = " read ";
76         /**
77          * Config Reader
78          */
79         @Autowired
80         @Qualifier("configurationReader")
81         private ConfigurationReader configReader;
82
83         /**
84          * HttpServletRequest obj
85          */
86         @Context
87         private HttpServletRequest request;
88
89         /**
90          * HttpServletResponse obj
91          */
92         @Context
93         private HttpServletResponse response;
94
95         /**
96          * TopicService obj
97          */
98         @Autowired
99         private TopicService topicService;
100
101         /**
102          * DMaaPErrorMessages obj
103          */
104         @Autowired
105         private DMaaPErrorMessages errorMessages;
106
107         private DMaaPContext getDmaapContext() {
108                 DMaaPContext dmaapContext = new DMaaPContext();
109                 dmaapContext.setRequest(request);
110                 dmaapContext.setResponse(response);
111                 dmaapContext.setConfigReader(configReader);
112                 return dmaapContext;
113         }
114
115         /**
116          * Fetches a list of topics from the current kafka instance and converted
117          * into json object.
118          *
119          * @throws AccessDeniedException
120          * @throws CambriaApiException
121          * @throws IOException
122          * @throws JSONException
123          * */
124         @GET
125         public void getTopics() throws CambriaApiException {
126                 try {
127                         LOGGER.info("Authenticating the user before fetching the topics");
128                         String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
129                         String permission = mrNameS+"|"+"*"+"|"+"view";
130                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
131                         //Check if client is using AAF CADI Basic Authorization
132                         //If yes then check for AAF role authentication else display all topics 
133                         if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER) && !aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
134                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
135                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
136                                                 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
137                                         LOGGER.info(errRes.toString());
138                                         throw new DMaaPAccessDeniedException(errRes);
139                         }
140                         LOGGER.info("Fetching all Topics");
141                         topicService.getTopics(getDmaapContext());
142                         LOGGER.info("Returning List of all Topics");
143                 } catch (JSONException | ConfigDbException | IOException excp) {
144                         LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
145                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
146                                 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
147                                 errorMessages.getTopicsfailure() + excp.getMessage());
148                         LOGGER.info(errRes.toString());
149                         throw new CambriaApiException(errRes);
150                 }
151         }
152
153         /**
154          * Fetches a list of topics from the current kafka instance and converted
155          * into json object.
156          *
157          * @return list of the topics in json format
158          * @throws AccessDeniedException
159          * @throws CambriaApiException
160          * @throws IOException
161          * @throws JSONException
162          * */
163         @GET
164         @Path("/listAll")
165         public void getAllTopics() throws CambriaApiException {
166                 try {
167                         LOGGER.info("Authenticating the user before fetching the topics");
168                         String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
169                         String permission = mrNameS+"|"+"*"+"|"+"view";
170                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
171                         //Check if client is using AAF CADI Basic Authorization
172                         //If yes then check for AAF role authentication else display all topics 
173                         if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
174                                 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
175                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
176                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
177                                                 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
178                                         LOGGER.info(errRes.toString());
179                                         throw new DMaaPAccessDeniedException(errRes);
180                                 }
181                         }
182                         LOGGER.info("Fetching all Topics");
183                         topicService.getAllTopics(getDmaapContext());
184                         LOGGER.info("Returning List of all Topics");
185                 } catch (JSONException | ConfigDbException | IOException excp) {
186                         LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
187                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
188                                 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
189                                 errorMessages.getTopicsfailure()+ excp.getMessage());
190                         LOGGER.info(errRes.toString());
191                         throw new CambriaApiException(errRes);
192                 }
193         }
194
195         /**
196          * Returns details of the topic whose name is passed as a parameter
197          *
198          * @param topicName
199          *            - name of the topic
200          * @return details of a topic whose name is mentioned in the request in json
201          *         format.
202          * @throws AccessDeniedException
203          * @throws DMaaPAccessDeniedException
204          * @throws IOException
205          * */
206         @GET
207         @Path("/{topicName}")
208         public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
209                 try {
210                         LOGGER.info("Authenticating the user before fetching the details about topic = {}", topicName);
211                         DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
212                         //Check if client is using AAF CADI Basic Authorization
213                         //If yes then check for AAF role authentication else display all topics 
214                         if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
215                                 String permission = aaf.aafPermissionString(topicName, "view");
216                                 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
217                                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
218                                                 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
219                                                 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
220                                         LOGGER.info(errRes.toString());
221                                         throw new DMaaPAccessDeniedException(errRes);
222                                 }
223                         }
224                         LOGGER.info("Fetching Topic: {}", topicName);
225                         topicService.getTopic(getDmaapContext(), topicName);
226                         LOGGER.info("Fetched details of topic: {}", topicName);
227                 } catch (ConfigDbException | IOException | TopicExistsException excp) {
228                         LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
229                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
230                                 DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
231                                 errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
232                         LOGGER.info(errRes.toString());
233                         throw new CambriaApiException(errRes);
234                 }
235         }
236
237         /**
238          * This method is still not working. Need to check on post call and how to
239          * accept parameters for post call
240          *
241          * @param topicBean
242          *            it will have the bean object
243          * @throws TopicExistsException
244          * @throws CambriaApiException
245          * @throws JSONException
246          * @throws IOException
247          * @throws AccessDeniedException
248          *
249          * */
250         @POST
251         @Path("/create")
252         @Consumes({ MediaType.APPLICATION_JSON })
253         public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
254                 try {
255                         LOGGER.info("Creating Topic."+topicBean.getTopicName());
256                         topicService.createTopic(getDmaapContext(), topicBean);
257                         LOGGER.info("Topic created Successfully.");
258                 } catch (TopicExistsException ex){
259                         LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
260                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
261                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
262                                 errorMessages.getCreateTopicFail()+ ex.getMessage());
263                         LOGGER.info(errRes.toString());
264                         throw new CambriaApiException(errRes);
265                 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
266                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
267                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
268                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
269                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
270                         LOGGER.info(errRes.toString());
271                         throw new CambriaApiException(errRes);
272                 } catch (CambriaApiException |  IOException excp) {
273                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
274                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
275                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
276                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
277                         LOGGER.info(errRes.toString());
278                         throw new CambriaApiException(errRes);
279                 }
280         }
281
282         /**
283          * Deletes existing topic whose name is passed as a parameter
284          *
285          * @param topicName
286          *            topic
287          * @throws CambriaApiException
288          * @throws IOException
289          * */
290         @DELETE
291         @Path("/{topicName}")
292         public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
293                 try {
294                         LOGGER.info("Deleting Topic: " + topicName);
295                         topicService.deleteTopic(getDmaapContext(), topicName);
296                         LOGGER.info("Topic [" + topicName + "] deleted successfully.");
297                 } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
298                         LOGGER.error("Error while deleting a topic: " + excp.getMessage(), excp);
299                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
300                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
301                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
302                         LOGGER.info(errRes.toString());
303                         throw new CambriaApiException(errRes);
304                 } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
305                         LOGGER.error("Error while deleting topic: " + topicName, excp);
306                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
307                                 DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
308                                 errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
309                         LOGGER.info(errRes.toString());
310                         throw new CambriaApiException(errRes);
311                 }
312         }
313
314         /**
315          * This method will fetch the details of publisher by giving topic name
316          *
317          * @param topicName
318          * @throws CambriaApiException
319          * @throws AccessDeniedException
320          */
321         @GET
322         @Path("/{topicName}/producers")
323         public void getPublishersByTopicName(
324                 @PathParam("topicName") String topicName) throws CambriaApiException {
325                 try {
326                         LOGGER.info("Fetching list of all the publishers for topic " + topicName);
327                         topicService.getPublishersByTopicName(getDmaapContext(), topicName);
328                         LOGGER.info("Returning list of all the publishers for topic " + topicName);
329                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
330                         LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
331                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
332                                 DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
333                                 "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
334                         LOGGER.info(errRes.toString());
335                         throw new CambriaApiException(errRes);
336                 }
337         }
338
339         /**
340          * proving permission for the topic for a particular publisher id
341          *
342          * @param topicName
343          * @param producerId
344          * @throws CambriaApiException
345          */
346         @PUT
347         @Path("/{topicName}/producers/{producerId}")
348         public void permitPublisherForTopic(
349                 @PathParam("topicName") String topicName,
350                 @PathParam("producerId") String producerId) throws CambriaApiException {
351                 try {
352                         LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
353                         topicService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
354                         LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
355                 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
356                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
357                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
358                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
359                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
360                         LOGGER.info(errRes.toString());
361                         throw new CambriaApiException(errRes);
362                 } catch ( ConfigDbException | IOException | TopicExistsException excp) {
363                         LOGGER.error("Error while granting write access to producer ["
364                                 + producerId + "] for topic " + topicName, excp);
365                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
366                                 DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
367                                 "Error while granting write access to producer ["
368                                         + producerId + "] for topic " + topicName + excp.getMessage());
369                         LOGGER.info(errRes.toString());
370                         throw new CambriaApiException(errRes);
371                 }
372         }
373
374         /**
375          * Removing access for a publisher id for any particular topic
376          *
377          * @param topicName
378          * @param producerId
379          * @throws CambriaApiException
380          */
381         @DELETE
382         @Path("/{topicName}/producers/{producerId}")
383         public void denyPublisherForTopic(@PathParam("topicName") String topicName,
384                 @PathParam("producerId") String producerId) throws CambriaApiException {
385                 try {
386                         LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
387                         topicService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
388                         LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
389                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
390                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
391                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
392                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
393                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
394                         LOGGER.info(errRes.toString());
395                         throw new CambriaApiException(errRes);
396                 } catch ( ConfigDbException | IOException | TopicExistsException excp) {
397                         LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName, excp);
398                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
399                                 DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
400                                 "Error while revoking write access to producer ["
401                                         + producerId + "] for topic " + topicName + excp.getMessage());
402                         LOGGER.info(errRes.toString());
403                         throw new CambriaApiException(errRes);
404                 }
405         }
406
407         /**
408          * Get the consumer details by the topic name
409          *
410          * @param topicName
411          * @throws CambriaApiException
412          */
413         @GET
414         @Path("/{topicName}/consumers")
415         public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
416                 try {
417                         LOGGER.info("Fetching list of all consumers for topic " + topicName);
418                         topicService.getConsumersByTopicName(getDmaapContext(), topicName);
419                         LOGGER.info("Returning list of all consumers for topic " + topicName);
420                 } catch (IOException | ConfigDbException | TopicExistsException excp) {
421                         LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
422                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
423                                 DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
424                                 "Error while fetching list of all consumers for topic: " + topicName+ excp.getMessage());
425                         LOGGER.info(errRes.toString());
426                         throw new CambriaApiException(errRes);
427                 }
428         }
429
430         /**
431          * providing access for consumer for any particular topic
432          *
433          * @param topicName
434          * @param consumerId
435          * @throws CambriaApiException
436          */
437         @PUT
438         @Path("/{topicName}/consumers/{consumerId}")
439         public void permitConsumerForTopic(
440                 @PathParam("topicName") String topicName,
441                 @PathParam("consumerId") String consumerId) throws CambriaApiException {
442                 try {
443                         LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
444                         topicService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
445                         LOGGER.info("Read access granted to consumer [{0}] for topic {1}", consumerId, topicName);
446                 } catch (AccessDeniedException | ConfigDbException | IOException
447                         | TopicExistsException excp) {
448                         LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName, excp);
449                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
450                                 DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
451                                 "Error while granting read access to consumer ["
452                                         + consumerId + "] for topic " + topicName+ excp.getMessage());
453                         LOGGER.info(errRes.toString());
454                         throw new CambriaApiException(errRes);
455                 }
456         }
457
458         /**
459          * Removing access for consumer for any particular topic
460          *
461          * @param topicName
462          * @param consumerId
463          * @throws CambriaApiException
464          */
465         @DELETE
466         @Path("/{topicName}/consumers/{consumerId}")
467         public void denyConsumerForTopic(@PathParam("topicName") String topicName,
468                 @PathParam("consumerId") String consumerId) throws CambriaApiException {
469                 try {
470                         LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
471                         topicService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
472                         LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
473                 } catch ( ConfigDbException | IOException
474                         | TopicExistsException excp) {
475                         LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName, excp);
476                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
477                                 DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
478                                 "Error while revoking read access to consumer ["
479                                         + consumerId + "] for topic " + topicName+ excp.getMessage());
480                         LOGGER.info(errRes.toString());
481                         throw new CambriaApiException(errRes);
482                 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
483                         LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
484                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
485                                 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
486                                 errorMessages.getCreateTopicFail()+ excp.getMessage());
487                         LOGGER.info(errRes.toString());
488                         throw new CambriaApiException(errRes);
489                 }
490         }
491
492         public TopicService getTopicService() {
493                 return topicService;
494         }
495
496         public void setTopicService(TopicService topicService) {
497                 this.topicService = topicService;
498         }
499
500 }