1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import com.att.eelf.configuration.EELFLogger;
25 import com.att.eelf.configuration.EELFManager;
26 import com.att.nsa.configs.ConfigDbException;
27 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
28 import java.io.IOException;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.Consumes;
32 import javax.ws.rs.DELETE;
33 import javax.ws.rs.GET;
34 import javax.ws.rs.POST;
35 import javax.ws.rs.PUT;
36 import javax.ws.rs.Path;
37 import javax.ws.rs.PathParam;
38 import javax.ws.rs.core.Context;
39 import javax.ws.rs.core.MediaType;
40 import org.apache.http.HttpStatus;
41 import org.json.JSONException;
42 import org.onap.dmaap.dmf.mr.CambriaApiException;
43 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
44 import org.onap.dmaap.dmf.mr.beans.TopicBean;
45 import org.onap.dmaap.dmf.mr.constants.CambriaConstants;
46 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
47 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
49 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
50 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
51 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticator;
52 import org.onap.dmaap.dmf.mr.security.DMaaPAAFAuthenticatorImpl;
53 import org.onap.dmaap.dmf.mr.service.TopicService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import org.springframework.beans.factory.annotation.Autowired;
57 import org.springframework.beans.factory.annotation.Qualifier;
58 import org.springframework.stereotype.Component;
61 * This class is a CXF REST service which acts
62 * as gateway for MR Topic Service.
63 * @author Ramkumar Sembaiyan
69 public class TopicRestService {
74 private static final EELFLogger LOGGER = EELFManager.getLogger(TopicRestService.class);
75 private static final String READ = " read ";
80 @Qualifier("configurationReader")
81 private ConfigurationReader configReader;
84 * HttpServletRequest obj
87 private HttpServletRequest request;
90 * HttpServletResponse obj
93 private HttpServletResponse response;
99 private TopicService topicService;
102 * DMaaPErrorMessages obj
105 private DMaaPErrorMessages errorMessages;
107 private DMaaPContext getDmaapContext() {
108 DMaaPContext dmaapContext = new DMaaPContext();
109 dmaapContext.setRequest(request);
110 dmaapContext.setResponse(response);
111 dmaapContext.setConfigReader(configReader);
116 * Fetches a list of topics from the current kafka instance and converted
119 * @throws AccessDeniedException
120 * @throws CambriaApiException
121 * @throws IOException
122 * @throws JSONException
125 public void getTopics() throws CambriaApiException {
127 LOGGER.info("Authenticating the user before fetching the topics");
128 String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
129 String permission = mrNameS+"|"+"*"+"|"+"view";
130 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
131 //Check if client is using AAF CADI Basic Authorization
132 //If yes then check for AAF role authentication else display all topics
133 if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER) && !aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
134 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
135 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
136 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
137 LOGGER.info(errRes.toString());
138 throw new DMaaPAccessDeniedException(errRes);
140 LOGGER.info("Fetching all Topics");
141 topicService.getTopics(getDmaapContext());
142 LOGGER.info("Returning List of all Topics");
143 } catch (JSONException | ConfigDbException | IOException excp) {
144 LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
145 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
146 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
147 errorMessages.getTopicsfailure() + excp.getMessage());
148 LOGGER.info(errRes.toString());
149 throw new CambriaApiException(errRes);
154 * Fetches a list of topics from the current kafka instance and converted
157 * @return list of the topics in json format
158 * @throws AccessDeniedException
159 * @throws CambriaApiException
160 * @throws IOException
161 * @throws JSONException
165 public void getAllTopics() throws CambriaApiException {
167 LOGGER.info("Authenticating the user before fetching the topics");
168 String mrNameS = com.att.ajsc.beans.PropertiesMapBean.getProperty(CambriaConstants.msgRtr_prop,"msgRtr.namespace.aaf");
169 String permission = mrNameS+"|"+"*"+"|"+"view";
170 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
171 //Check if client is using AAF CADI Basic Authorization
172 //If yes then check for AAF role authentication else display all topics
173 if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
174 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
175 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
176 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
177 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
178 LOGGER.info(errRes.toString());
179 throw new DMaaPAccessDeniedException(errRes);
182 LOGGER.info("Fetching all Topics");
183 topicService.getAllTopics(getDmaapContext());
184 LOGGER.info("Returning List of all Topics");
185 } catch (JSONException | ConfigDbException | IOException excp) {
186 LOGGER.error("Failed to retrieve list of all topics: " + excp.getMessage(), excp);
187 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
188 DMaaPResponseCode.GET_TOPICS_FAIL.getResponseCode(),
189 errorMessages.getTopicsfailure()+ excp.getMessage());
190 LOGGER.info(errRes.toString());
191 throw new CambriaApiException(errRes);
196 * Returns details of the topic whose name is passed as a parameter
199 * - name of the topic
200 * @return details of a topic whose name is mentioned in the request in json
202 * @throws AccessDeniedException
203 * @throws DMaaPAccessDeniedException
204 * @throws IOException
207 @Path("/{topicName}")
208 public void getTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
210 LOGGER.info("Authenticating the user before fetching the details about topic = {}", topicName);
211 DMaaPAAFAuthenticator aaf = new DMaaPAAFAuthenticatorImpl();
212 //Check if client is using AAF CADI Basic Authorization
213 //If yes then check for AAF role authentication else display all topics
214 if(null != getDmaapContext().getRequest().getHeader(Utils.AUTH_HEADER)) {
215 String permission = aaf.aafPermissionString(topicName, "view");
216 if(!aaf.aafAuthentication(getDmaapContext().getRequest(), permission)) {
217 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
218 DMaaPResponseCode.ACCESS_NOT_PERMITTED.getResponseCode(),
219 errorMessages.getNotPermitted1() + READ + errorMessages.getNotPermitted2());
220 LOGGER.info(errRes.toString());
221 throw new DMaaPAccessDeniedException(errRes);
224 LOGGER.info("Fetching Topic: {}", topicName);
225 topicService.getTopic(getDmaapContext(), topicName);
226 LOGGER.info("Fetched details of topic: {}", topicName);
227 } catch (ConfigDbException | IOException | TopicExistsException excp) {
228 LOGGER.error("Failed to retrieve details of topic: " + topicName, excp);
229 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
230 DMaaPResponseCode.GET_TOPICS_DETAILS_FAIL.getResponseCode(),
231 errorMessages.getTopicDetailsFail()+topicName+ excp.getMessage());
232 LOGGER.info(errRes.toString());
233 throw new CambriaApiException(errRes);
238 * This method is still not working. Need to check on post call and how to
239 * accept parameters for post call
242 * it will have the bean object
243 * @throws TopicExistsException
244 * @throws CambriaApiException
245 * @throws JSONException
246 * @throws IOException
247 * @throws AccessDeniedException
252 @Consumes({ MediaType.APPLICATION_JSON })
253 public void createTopic(TopicBean topicBean) throws CambriaApiException, JSONException {
255 LOGGER.info("Creating Topic."+topicBean.getTopicName());
256 topicService.createTopic(getDmaapContext(), topicBean);
257 LOGGER.info("Topic created Successfully.");
258 } catch (TopicExistsException ex){
259 LOGGER.error("Error while creating a topic: " + ex.getMessage(), ex);
260 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
261 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
262 errorMessages.getCreateTopicFail()+ ex.getMessage());
263 LOGGER.info(errRes.toString());
264 throw new CambriaApiException(errRes);
265 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
266 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
267 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
268 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
269 errorMessages.getCreateTopicFail()+ excp.getMessage());
270 LOGGER.info(errRes.toString());
271 throw new CambriaApiException(errRes);
272 } catch (CambriaApiException | IOException excp) {
273 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
274 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
275 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
276 errorMessages.getCreateTopicFail()+ excp.getMessage());
277 LOGGER.info(errRes.toString());
278 throw new CambriaApiException(errRes);
283 * Deletes existing topic whose name is passed as a parameter
287 * @throws CambriaApiException
288 * @throws IOException
291 @Path("/{topicName}")
292 public void deleteTopic(@PathParam("topicName") String topicName) throws CambriaApiException {
294 LOGGER.info("Deleting Topic: " + topicName);
295 topicService.deleteTopic(getDmaapContext(), topicName);
296 LOGGER.info("Topic [" + topicName + "] deleted successfully.");
297 } catch (DMaaPAccessDeniedException| AccessDeniedException excp) {
298 LOGGER.error("Error while deleting a topic: " + excp.getMessage(), excp);
299 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
300 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
301 errorMessages.getCreateTopicFail()+ excp.getMessage());
302 LOGGER.info(errRes.toString());
303 throw new CambriaApiException(errRes);
304 } catch (IOException | ConfigDbException | CambriaApiException | TopicExistsException excp) {
305 LOGGER.error("Error while deleting topic: " + topicName, excp);
306 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
307 DMaaPResponseCode.DELETE_TOPIC_FAIL.getResponseCode(),
308 errorMessages.getDeleteTopicFail()+ topicName + excp.getMessage());
309 LOGGER.info(errRes.toString());
310 throw new CambriaApiException(errRes);
315 * This method will fetch the details of publisher by giving topic name
318 * @throws CambriaApiException
319 * @throws AccessDeniedException
322 @Path("/{topicName}/producers")
323 public void getPublishersByTopicName(
324 @PathParam("topicName") String topicName) throws CambriaApiException {
326 LOGGER.info("Fetching list of all the publishers for topic " + topicName);
327 topicService.getPublishersByTopicName(getDmaapContext(), topicName);
328 LOGGER.info("Returning list of all the publishers for topic " + topicName);
329 } catch (IOException | ConfigDbException | TopicExistsException excp) {
330 LOGGER.error("Error while fetching list of publishers for topic " + topicName, excp);
331 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
332 DMaaPResponseCode.GET_PUBLISHERS_BY_TOPIC.getResponseCode(),
333 "Error while fetching list of publishers for topic: " + topicName + excp.getMessage());
334 LOGGER.info(errRes.toString());
335 throw new CambriaApiException(errRes);
340 * proving permission for the topic for a particular publisher id
344 * @throws CambriaApiException
347 @Path("/{topicName}/producers/{producerId}")
348 public void permitPublisherForTopic(
349 @PathParam("topicName") String topicName,
350 @PathParam("producerId") String producerId) throws CambriaApiException {
352 LOGGER.info("Granting write access to producer [" + producerId + "] for topic " + topicName);
353 topicService.permitPublisherForTopic(getDmaapContext(), topicName, producerId);
354 LOGGER.info("Write access has been granted to producer [" + producerId + "] for topic " + topicName);
355 } catch (AccessDeniedException | DMaaPAccessDeniedException excp) {
356 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
357 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
358 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
359 errorMessages.getCreateTopicFail()+ excp.getMessage());
360 LOGGER.info(errRes.toString());
361 throw new CambriaApiException(errRes);
362 } catch ( ConfigDbException | IOException | TopicExistsException excp) {
363 LOGGER.error("Error while granting write access to producer ["
364 + producerId + "] for topic " + topicName, excp);
365 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
366 DMaaPResponseCode.PERMIT_PUBLISHER_FOR_TOPIC.getResponseCode(),
367 "Error while granting write access to producer ["
368 + producerId + "] for topic " + topicName + excp.getMessage());
369 LOGGER.info(errRes.toString());
370 throw new CambriaApiException(errRes);
375 * Removing access for a publisher id for any particular topic
379 * @throws CambriaApiException
382 @Path("/{topicName}/producers/{producerId}")
383 public void denyPublisherForTopic(@PathParam("topicName") String topicName,
384 @PathParam("producerId") String producerId) throws CambriaApiException {
386 LOGGER.info("Revoking write access to producer [" + producerId + "] for topic " + topicName);
387 topicService.denyPublisherForTopic(getDmaapContext(), topicName, producerId);
388 LOGGER.info("Write access revoked for producer [" + producerId + "] for topic " + topicName);
389 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
390 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
391 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
392 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
393 errorMessages.getCreateTopicFail()+ excp.getMessage());
394 LOGGER.info(errRes.toString());
395 throw new CambriaApiException(errRes);
396 } catch ( ConfigDbException | IOException | TopicExistsException excp) {
397 LOGGER.error("Error while revoking write access for producer [" + producerId + "] for topic " + topicName, excp);
398 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
399 DMaaPResponseCode.REVOKE_PUBLISHER_FOR_TOPIC.getResponseCode(),
400 "Error while revoking write access to producer ["
401 + producerId + "] for topic " + topicName + excp.getMessage());
402 LOGGER.info(errRes.toString());
403 throw new CambriaApiException(errRes);
408 * Get the consumer details by the topic name
411 * @throws CambriaApiException
414 @Path("/{topicName}/consumers")
415 public void getConsumersByTopicName(@PathParam("topicName") String topicName) throws CambriaApiException {
417 LOGGER.info("Fetching list of all consumers for topic " + topicName);
418 topicService.getConsumersByTopicName(getDmaapContext(), topicName);
419 LOGGER.info("Returning list of all consumers for topic " + topicName);
420 } catch (IOException | ConfigDbException | TopicExistsException excp) {
421 LOGGER.error("Error while fetching list of all consumers for topic " + topicName, excp);
422 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
423 DMaaPResponseCode.GET_CONSUMERS_BY_TOPIC.getResponseCode(),
424 "Error while fetching list of all consumers for topic: " + topicName+ excp.getMessage());
425 LOGGER.info(errRes.toString());
426 throw new CambriaApiException(errRes);
431 * providing access for consumer for any particular topic
435 * @throws CambriaApiException
438 @Path("/{topicName}/consumers/{consumerId}")
439 public void permitConsumerForTopic(
440 @PathParam("topicName") String topicName,
441 @PathParam("consumerId") String consumerId) throws CambriaApiException {
443 LOGGER.info("Granting read access to consumer [" + consumerId + "] for topic " + topicName);
444 topicService.permitConsumerForTopic(getDmaapContext(), topicName, consumerId);
445 LOGGER.info("Read access granted to consumer [{0}] for topic {1}", consumerId, topicName);
446 } catch (AccessDeniedException | ConfigDbException | IOException
447 | TopicExistsException excp) {
448 LOGGER.error("Error while granting read access to consumer [" + consumerId + "] for topic " + topicName, excp);
449 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
450 DMaaPResponseCode.PERMIT_CONSUMER_FOR_TOPIC.getResponseCode(),
451 "Error while granting read access to consumer ["
452 + consumerId + "] for topic " + topicName+ excp.getMessage());
453 LOGGER.info(errRes.toString());
454 throw new CambriaApiException(errRes);
459 * Removing access for consumer for any particular topic
463 * @throws CambriaApiException
466 @Path("/{topicName}/consumers/{consumerId}")
467 public void denyConsumerForTopic(@PathParam("topicName") String topicName,
468 @PathParam("consumerId") String consumerId) throws CambriaApiException {
470 LOGGER.info("Revoking read access to consumer [" + consumerId + "] for topic " + topicName);
471 topicService.denyConsumerForTopic(getDmaapContext(), topicName, consumerId);
472 LOGGER.info("Read access revoked to consumer [" + consumerId + "] for topic " + topicName);
473 } catch ( ConfigDbException | IOException
474 | TopicExistsException excp) {
475 LOGGER.error("Error while revoking read access to consumer [" + consumerId + "] for topic " + topicName, excp);
476 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_FORBIDDEN,
477 DMaaPResponseCode.REVOKE_CONSUMER_FOR_TOPIC.getResponseCode(),
478 "Error while revoking read access to consumer ["
479 + consumerId + "] for topic " + topicName+ excp.getMessage());
480 LOGGER.info(errRes.toString());
481 throw new CambriaApiException(errRes);
482 } catch (DMaaPAccessDeniedException | AccessDeniedException excp) {
483 LOGGER.error("Error while creating a topic: " + excp.getMessage(), excp);
484 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
485 DMaaPResponseCode.CREATE_TOPIC_FAIL.getResponseCode(),
486 errorMessages.getCreateTopicFail()+ excp.getMessage());
487 LOGGER.info(errRes.toString());
488 throw new CambriaApiException(errRes);
492 public TopicService getTopicService() {
496 public void setTopicService(TopicService topicService) {
497 this.topicService = topicService;