1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.Date;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.GET;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.QueryParam;
37 import javax.ws.rs.core.Context;
39 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
46 import org.onap.dmaap.dmf.mr.CambriaApiException;
47 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
48 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
50 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
51 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
52 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
53 import org.onap.dmaap.dmf.mr.service.EventsService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
58 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
59 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
61 * This class is a CXF REST service which acts
62 * as gateway for MR Event Service.
63 * @author rajashree.khare
68 public class EventsRestService {
73 //private Logger log = Logger.getLogger(EventsRestService.class.toString());
74 private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
76 * HttpServletRequest obj
79 private HttpServletRequest request;
82 * HttpServletResponse obj
85 private HttpServletResponse response;
92 @Qualifier("configurationReader")
93 private ConfigurationReader configReader;
96 private EventsService eventsService;
99 private DMaaPErrorMessages errorMessages;
103 * This method is used to consume messages.Taking three parameter
104 * topic,consumerGroup and consumerId .Consumer decide to which topic they
105 * want to consume messages.In on consumer Group there might be many
106 * consumer may be present.
109 * specify- the topic name
110 * @param consumergroup
111 * - specify the consumer group
113 * -specify the consumer id
115 * handles CambriaApiException | ConfigDbException |
116 * TopicExistsException | AccessDeniedException |
117 * UnavailableException | IOException in try catch block
118 * @throws CambriaApiException
122 @Path("/{topic}/{consumergroup}/{consumerid}")
123 public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
124 String consumergroup,
125 @PathParam("consumerid") String consumerid) throws CambriaApiException {
126 // log.info("Consuming message from topic " + topic );
127 DMaaPContext dMaaPContext = getDmaapContext();
128 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
132 eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
134 catch (TopicExistsException e) {
135 log.error("Error while reading data from topic [" + topic + "].", e);
137 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
138 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
139 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
141 request.getRemoteHost());
142 log.info(errRes.toString());
143 throw new CambriaApiException(errRes);
146 catch (DMaaPAccessDeniedException | AccessDeniedException e) {
147 log.error("Error while reading data from topic [" + topic + "].", e);
149 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
150 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
151 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
153 request.getRemoteHost());
154 log.info(errRes.toString());
155 throw new CambriaApiException(errRes);
159 catch (ConfigDbException | UnavailableException | IOException e) {
160 log.error("Error while reading data from topic [" + topic + "].", e);
162 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
163 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
164 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
166 request.getRemoteHost());
167 log.info(errRes.toString());
168 throw new CambriaApiException(errRes);
175 * This method is used to throw an exception back to the client app if CG/CID is not passed
176 * while consuming messages
180 public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
181 // log.info("Consuming message from topic " + topic );
182 DMaaPContext dMaaPContext = getDmaapContext();
183 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
187 throw new TopicExistsException("Incorrect URL");
189 catch (TopicExistsException e) {
190 log.error("Error while reading data from topic [" + topic + "].", e);
192 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
193 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
195 log.info(errRes.toString());
196 throw new CambriaApiException(errRes);
203 * This method is used to throw an exception back to the client app if CG/CID is not passed
204 * while consuming messages
207 @Path("/{topic}/{consumergroup}")
208 public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
210 ) throws CambriaApiException {
211 // log.info("Consuming message from topic " + topic );
212 DMaaPContext dMaaPContext = getDmaapContext();
213 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
217 throw new TopicExistsException("Incorrect URL");
219 catch (TopicExistsException e) {
220 log.error("Error while reading data from topic [" + topic + "].", e);
222 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
223 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
225 log.info(errRes.toString());
226 throw new CambriaApiException(errRes);
239 * This method is used to publish messages.Taking two parameter topic and
240 * partition.Publisher decide to which topic they want to publish message
241 * and kafka decide to which partition of topic message will send,
245 * @param partitionKey
247 * handles CambriaApiException | ConfigDbException |
248 * TopicExistsException | AccessDeniedException | IOException in
250 * @throws CambriaApiException
254 @Produces("application/json")
256 public void pushEvents(@PathParam("topic") String topic, InputStream msg,
257 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
258 log.info("Publishing message to topic " + topic);
261 eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
263 catch ( TopicExistsException e) {
264 log.error("Error while publishing to topic [" + topic + "].", e);
266 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
267 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
268 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
269 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
270 log.info(errRes.toString());
271 throw new CambriaApiException(errRes);
273 catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
274 log.error("Error while publishing to topic [" + topic + "].", e);
276 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
277 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
278 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
279 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
280 log.info(errRes.toString());
281 throw new CambriaApiException(errRes);
285 catch (ConfigDbException | IOException | missingReqdSetting e) {
286 log.error("Error while publishing to topic [" + topic + "].", e);
288 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
289 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
290 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
291 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
292 log.info(errRes.toString());
293 throw new CambriaApiException(errRes);
298 * This method is used to publish messages by passing an optional header
299 * called 'transactionId'. If the 'transactionId' is not provided in the
300 * input then a new transaction object will be created. Else the existing
301 * transaction object will be updated with the counter details.
304 * @param partitionKey
306 * handles CambriaApiException | ConfigDbException |
307 * TopicExistsException | AccessDeniedException | IOException in
309 * @throws CambriaApiException
312 @Produces("application/json")
313 @Path("/transaction/{topic}")
314 public void pushEventsWithTransaction(@PathParam("topic") String topic,
315 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
316 // log.info("Publishing message with transaction id for topic " + topic
322 eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
324 Utils.getFormattedDate(new Date()));
327 catch ( TopicExistsException e) {
328 log.error("Error while publishing to topic [" + topic + "].", e);
330 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
331 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
332 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
333 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
334 log.info(errRes.toString());
335 throw new CambriaApiException(errRes);
337 catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
338 log.error("Error while publishing to topic [" + topic + "].", e);
340 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
341 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
342 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
343 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
344 log.info(errRes.toString());
345 throw new CambriaApiException(errRes);
348 catch (ConfigDbException | IOException | missingReqdSetting e) {
349 log.error("Error while publishing to topic : " + topic, e);
351 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
352 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
353 + errorMessages.getPublishMsgError() + e.getMessage(), null,
354 Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
355 request.getRemoteHost(),
357 log.info(errRes.toString());
358 throw new CambriaApiException(errRes);
364 * This method is used for taking Configuration Object,HttpServletRequest
365 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
368 * @return DMaaPContext object from where user can get Configuration
369 * Object,HttpServlet Object
372 private DMaaPContext getDmaapContext() {
374 DMaaPContext dmaapContext = new DMaaPContext();
375 dmaapContext.setRequest(request);
376 dmaapContext.setResponse(response);
377 dmaapContext.setConfigReader(configReader);