1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
45 import org.onap.dmaap.dmf.mr.CambriaApiException;
46 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
47 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
50 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
52 import org.onap.dmaap.dmf.mr.service.EventsService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
60 * This class is a CXF REST service which acts
61 * as gateway for MR Event Service.
62 * @author rajashree.khare
67 public class EventsRestService {
72 //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73 private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
75 * HttpServletRequest obj
78 private HttpServletRequest request;
81 * HttpServletResponse obj
84 private HttpServletResponse response;
91 @Qualifier("configurationReader")
92 private ConfigurationReader configReader;
95 private EventsService eventsService;
98 private DMaaPErrorMessages errorMessages;
101 * This method is used to consume messages.Taking three parameter
102 * topic,consumerGroup and consumerId .Consumer decide to which topic they
103 * want to consume messages.In on consumer Group there might be many
104 * consumer may be present.
107 * specify- the topic name
108 * @param consumergroup
109 * - specify the consumer group
111 * -specify the consumer id
113 * handles CambriaApiException | ConfigDbException |
114 * TopicExistsException | AccessDeniedException |
115 * UnavailableException | IOException in try catch block
116 * @throws CambriaApiException
120 @Path("/{topic}/{consumergroup}/{consumerid}")
121 public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
122 String consumergroup,
123 @PathParam("consumerid") String consumerid) throws CambriaApiException {
124 // log.info("Consuming message from topic " + topic );
125 DMaaPContext dMaaPContext = getDmaapContext();
126 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
130 eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
132 catch (TopicExistsException e) {
133 log.error("Error while reading data from topic [" + topic + "].", e);
135 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
136 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
137 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
139 request.getRemoteHost());
140 log.info(errRes.toString());
141 throw new CambriaApiException(errRes);
144 catch (DMaaPAccessDeniedException | AccessDeniedException e) {
145 log.error("Error while reading data from topic [" + topic + "].", e);
147 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
148 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
149 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
151 request.getRemoteHost());
152 log.info(errRes.toString());
153 throw new CambriaApiException(errRes);
157 catch (ConfigDbException | UnavailableException | IOException e) {
158 log.error("Error while reading data from topic [" + topic + "].", e);
160 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
161 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
162 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
164 request.getRemoteHost());
165 log.info(errRes.toString());
166 throw new CambriaApiException(errRes);
173 * This method is used to throw an exception back to the client app if CG/CID is not passed
174 * while consuming messages
178 public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
179 // log.info("Consuming message from topic " + topic );
180 DMaaPContext dMaaPContext = getDmaapContext();
181 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
185 throw new TopicExistsException("Incorrect URL");
187 catch (TopicExistsException e) {
188 log.error("Error while reading data from topic [" + topic + "].", e);
190 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
191 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
193 log.info(errRes.toString());
194 throw new CambriaApiException(errRes);
201 * This method is used to throw an exception back to the client app if CG/CID is not passed
202 * while consuming messages
205 @Path("/{topic}/{consumergroup}")
206 public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
208 ) throws CambriaApiException {
209 // log.info("Consuming message from topic " + topic );
210 DMaaPContext dMaaPContext = getDmaapContext();
211 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
215 throw new TopicExistsException("Incorrect URL");
217 catch (TopicExistsException e) {
218 log.error("Error while reading data from topic [" + topic + "].", e);
220 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
221 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
223 log.info(errRes.toString());
224 throw new CambriaApiException(errRes);
237 * This method is used to publish messages.Taking two parameter topic and
238 * partition.Publisher decide to which topic they want to publish message
239 * and kafka decide to which partition of topic message will send,
243 * @param partitionKey
245 * handles CambriaApiException | ConfigDbException |
246 * TopicExistsException | AccessDeniedException | IOException in
248 * @throws CambriaApiException
252 @Produces("application/json")
254 public void pushEvents(@PathParam("topic") String topic, InputStream msg,
255 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
256 log.info("Publishing message to topic " + topic);
259 eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
261 catch ( TopicExistsException e) {
262 log.error("Error while publishing to topic [" + topic + "].", e);
264 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
265 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
266 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
267 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
268 log.info(errRes.toString());
269 throw new CambriaApiException(errRes);
271 catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
272 log.error("Error while publishing to topic [" + topic + "].", e);
274 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
275 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
276 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
277 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
278 log.info(errRes.toString());
279 throw new CambriaApiException(errRes);
283 catch (ConfigDbException | IOException | missingReqdSetting e) {
284 log.error("Error while publishing to topic [" + topic + "].", e);
286 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
287 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
288 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
289 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
290 log.info(errRes.toString());
291 throw new CambriaApiException(errRes);
296 * This method is used to publish messages by passing an optional header
297 * called 'transactionId'. If the 'transactionId' is not provided in the
298 * input then a new transaction object will be created. Else the existing
299 * transaction object will be updated with the counter details.
302 * @param partitionKey
304 * handles CambriaApiException | ConfigDbException |
305 * TopicExistsException | AccessDeniedException | IOException in
307 * @throws CambriaApiException
310 @Produces("application/json")
311 @Path("/transaction/{topic}")
312 public void pushEventsWithTransaction(@PathParam("topic") String topic,
313 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
314 // log.info("Publishing message with transaction id for topic " + topic
318 eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
320 Utils.getFormattedDate(new Date()));
323 catch ( TopicExistsException e) {
324 log.error("Error while publishing to topic [" + topic + "].", e);
326 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
327 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
328 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
329 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
330 log.info(errRes.toString());
331 throw new CambriaApiException(errRes);
333 catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
334 log.error("Error while publishing to topic [" + topic + "].", e);
336 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
337 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
338 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
339 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
340 log.info(errRes.toString());
341 throw new CambriaApiException(errRes);
344 catch (ConfigDbException | IOException | missingReqdSetting e) {
345 log.error("Error while publishing to topic : " + topic, e);
347 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
348 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
349 + errorMessages.getPublishMsgError() + e.getMessage(), null,
350 Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
351 request.getRemoteHost(),
353 log.info(errRes.toString());
354 throw new CambriaApiException(errRes);
360 * This method is used for taking Configuration Object,HttpServletRequest
361 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
364 * @return DMaaPContext object from where user can get Configuration
365 * Object,HttpServlet Object
368 private DMaaPContext getDmaapContext() {
370 DMaaPContext dmaapContext = new DMaaPContext();
371 dmaapContext.setRequest(request);
372 dmaapContext.setResponse(response);
373 dmaapContext.setConfigReader(configReader);