1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package com.att.nsa.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
45 import com.att.nsa.cambria.CambriaApiException;
46 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
47 import com.att.nsa.cambria.beans.DMaaPContext;
48 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
49 import com.att.nsa.cambria.exception.DMaaPResponseCode;
50 import com.att.nsa.cambria.exception.ErrorResponse;
51 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
52 import com.att.nsa.cambria.service.EventsService;
53 import com.att.nsa.cambria.utils.ConfigurationReader;
54 import com.att.nsa.cambria.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
60 * This class is a CXF REST service which acts
61 * as gateway for MR Event Service.
67 public class EventsRestService {
72 //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73 private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
75 * HttpServletRequest obj
78 private HttpServletRequest request;
81 * HttpServletResponse obj
84 private HttpServletResponse response;
91 @Qualifier("configurationReader")
92 private ConfigurationReader configReader;
95 private EventsService eventsService;
98 private DMaaPErrorMessages errorMessages;
101 * This method is used to consume messages.Taking three parameter
102 * topic,consumerGroup and consumerId .Consumer decide to which topic they
103 * want to consume messages.In on consumer Group there might be many
104 * consumer may be present.
107 * specify- the topic name
108 * @param consumergroup
109 * - specify the consumer group
111 * -specify the consumer id
113 * handles CambriaApiException | ConfigDbException |
114 * TopicExistsException | AccessDeniedException |
115 * UnavailableException | IOException in try catch block
116 * @throws CambriaApiException
120 @Path("/{topic}/{consumergroup}/{consumerid}")
121 public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
122 String consumergroup,
123 @PathParam("consumerid") String consumerid) throws CambriaApiException {
124 // log.info("Consuming message from topic " + topic );
125 DMaaPContext dMaaPContext = getDmaapContext();
126 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
130 eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
132 catch (TopicExistsException e) {
133 log.error("Error while reading data from topic [" + topic + "].", e);
135 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
136 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
137 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
139 request.getRemoteHost());
140 log.info(errRes.toString());
141 throw new CambriaApiException(errRes);
144 catch (DMaaPAccessDeniedException | AccessDeniedException e) {
145 log.error("Error while reading data from topic [" + topic + "].", e);
147 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
148 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
149 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
151 request.getRemoteHost());
152 log.info(errRes.toString());
153 throw new CambriaApiException(errRes);
157 catch (ConfigDbException | UnavailableException | IOException e) {
158 log.error("Error while reading data from topic [" + topic + "].", e);
160 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
161 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
162 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
164 request.getRemoteHost());
165 log.info(errRes.toString());
166 throw new CambriaApiException(errRes);
172 * This method is used to publish messages.Taking two parameter topic and
173 * partition.Publisher decide to which topic they want to publish message
174 * and kafka decide to which partition of topic message will send,
178 * @param partitionKey
180 * handles CambriaApiException | ConfigDbException |
181 * TopicExistsException | AccessDeniedException | IOException in
183 * @throws CambriaApiException
187 @Produces("application/json")
189 public void pushEvents(@PathParam("topic") String topic, InputStream msg,
190 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
191 log.info("Publishing message to topic " + topic);
194 eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
196 catch ( TopicExistsException e) {
197 log.error("Error while publishing to topic [" + topic + "].", e);
199 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
200 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
201 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
202 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
203 log.info(errRes.toString());
204 throw new CambriaApiException(errRes);
206 catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
207 log.error("Error while publishing to topic [" + topic + "].", e);
209 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
210 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
211 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
212 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
213 log.info(errRes.toString());
214 throw new CambriaApiException(errRes);
218 catch (ConfigDbException | IOException | missingReqdSetting e) {
219 log.error("Error while publishing to topic [" + topic + "].", e);
221 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
222 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
223 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
224 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
225 log.info(errRes.toString());
226 throw new CambriaApiException(errRes);
231 * This method is used to publish messages by passing an optional header
232 * called 'transactionId'. If the 'transactionId' is not provided in the
233 * input then a new transaction object will be created. Else the existing
234 * transaction object will be updated with the counter details.
237 * @param partitionKey
239 * handles CambriaApiException | ConfigDbException |
240 * TopicExistsException | AccessDeniedException | IOException in
242 * @throws CambriaApiException
245 @Produces("application/json")
246 @Path("/transaction/{topic}")
247 public void pushEventsWithTransaction(@PathParam("topic") String topic,
248 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
249 // log.info("Publishing message with transaction id for topic " + topic
253 eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
255 Utils.getFormattedDate(new Date()));
258 catch ( TopicExistsException e) {
259 log.error("Error while publishing to topic [" + topic + "].", e);
261 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
262 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
263 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
264 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
265 log.info(errRes.toString());
266 throw new CambriaApiException(errRes);
268 catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
269 log.error("Error while publishing to topic [" + topic + "].", e);
271 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
272 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
273 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
274 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
275 log.info(errRes.toString());
276 throw new CambriaApiException(errRes);
279 catch (ConfigDbException | IOException | missingReqdSetting e) {
280 log.error("Error while publishing to topic : " + topic, e);
282 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
283 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
284 + errorMessages.getPublishMsgError() + e.getMessage(), null,
285 Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
286 request.getRemoteHost(),
288 log.info(errRes.toString());
289 throw new CambriaApiException(errRes);
295 * This method is used for taking Configuration Object,HttpServletRequest
296 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
299 * @return DMaaPContext object from where user can get Configuration
300 * Object,HttpServlet Object
303 private DMaaPContext getDmaapContext() {
305 DMaaPContext dmaapContext = new DMaaPContext();
306 dmaapContext.setRequest(request);
307 dmaapContext.setResponse(response);
308 dmaapContext.setConfigReader(configReader);