1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.Date;
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.GET;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.QueryParam;
37 import javax.ws.rs.core.Context;
39 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
46 import org.onap.dmaap.dmf.mr.CambriaApiException;
47 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
48 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
50 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
51 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
52 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
53 import org.onap.dmaap.dmf.mr.service.EventsService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
58 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
59 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
61 * This class is a CXF REST service which acts
62 * as gateway for MR Event Service.
63 * @author rajashree.khare
68 public class EventsRestService {
73 private static final EELFLogger log = EELFManager.getLogger(EventsRestService.class);
75 * HttpServletRequest obj
78 private HttpServletRequest request;
81 * HttpServletResponse obj
84 private HttpServletResponse response;
91 @Qualifier("configurationReader")
92 private ConfigurationReader configReader;
95 private EventsService eventsService;
98 private DMaaPErrorMessages errorMessages;
102 * This method is used to consume messages.Taking three parameter
103 * topic,consumerGroup and consumerId .Consumer decide to which topic they
104 * want to consume messages.In on consumer Group there might be many
105 * consumer may be present.
108 * specify- the topic name
109 * @param consumergroup
110 * - specify the consumer group
112 * -specify the consumer id
114 * handles CambriaApiException | ConfigDbException |
115 * TopicExistsException | AccessDeniedException |
116 * UnavailableException | IOException in try catch block
117 * @throws CambriaApiException
121 @Path("/{topic}/{consumergroup}/{consumerid}")
122 public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
123 String consumergroup,
124 @PathParam("consumerid") String consumerid) throws CambriaApiException {
125 log.info("Consuming message from topic " + topic );
126 DMaaPContext dMaaPContext = getDmaapContext();
127 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
130 eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
131 } catch (TopicExistsException e) {
132 log.error("Error while reading data from topic [" + topic + "].", e);
134 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
135 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
136 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
138 request.getRemoteHost());
139 log.info(errRes.toString());
140 throw new CambriaApiException(errRes);
141 } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
142 log.error("Error while reading data from topic [" + topic + "].", e);
143 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
144 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
145 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
147 request.getRemoteHost());
148 log.info(errRes.toString());
149 throw new CambriaApiException(errRes);
150 } catch (ConfigDbException | UnavailableException | IOException e) {
151 log.error("Error while reading data from topic [" + topic + "].", e);
153 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
154 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
155 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
157 request.getRemoteHost());
158 log.info(errRes.toString());
159 throw new CambriaApiException(errRes);
165 * This method is used to throw an exception back to the client app if CG/CID is not passed
166 * while consuming messages
170 public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
171 // log.info("Consuming message from topic " + topic );
172 DMaaPContext dMaaPContext = getDmaapContext();
173 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
177 throw new TopicExistsException("Incorrect URL");
178 } catch (TopicExistsException e) {
179 log.error("Error while reading data from topic [" + topic + "].", e);
181 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
182 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
184 log.info(errRes.toString());
185 throw new CambriaApiException(errRes);
190 * This method is used to throw an exception back to the client app if CG/CID is not passed
191 * while consuming messages
194 @Path("/{topic}/{consumergroup}")
195 public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
196 String consumergroup) throws CambriaApiException {
197 // log.info("Consuming message from topic " + topic );
198 DMaaPContext dMaaPContext = getDmaapContext();
199 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
201 throw new TopicExistsException("Incorrect URL");
202 } catch (TopicExistsException e) {
203 log.error("Error while reading data from topic [" + topic + "].", e);
205 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
206 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
208 log.info(errRes.toString());
209 throw new CambriaApiException(errRes);
214 * This method is used to publish messages.Taking two parameter topic and
215 * partition.Publisher decide to which topic they want to publish message
216 * and kafka decide to which partition of topic message will send,
220 * @param partitionKey
222 * handles CambriaApiException | ConfigDbException |
223 * TopicExistsException | AccessDeniedException | IOException in
225 * @throws CambriaApiException
229 @Produces("application/json")
231 public void pushEvents(@PathParam("topic") String topic, InputStream msg,
232 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
233 log.info("Publishing message to topic " + topic);
235 eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
236 } catch ( TopicExistsException e) {
237 log.error("Error while publishing to topic [" + topic + "].", e);
239 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
240 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
241 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
242 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
243 log.info(errRes.toString());
244 throw new CambriaApiException(errRes);
245 } catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
246 log.error("Error while publishing to topic [" + topic + "].", e);
248 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
249 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
250 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
251 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
252 log.info(errRes.toString());
253 throw new CambriaApiException(errRes);
254 } catch (ConfigDbException | IOException | missingReqdSetting e) {
255 log.error("Error while publishing to topic [" + topic + "].", e);
257 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
258 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
259 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
260 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
261 log.info(errRes.toString());
262 throw new CambriaApiException(errRes);
267 * This method is used to publish messages by passing an optional header
268 * called 'transactionId'. If the 'transactionId' is not provided in the
269 * input then a new transaction object will be created. Else the existing
270 * transaction object will be updated with the counter details.
273 * @param partitionKey
275 * handles CambriaApiException | ConfigDbException |
276 * TopicExistsException | AccessDeniedException | IOException in
278 * @throws CambriaApiException
281 @Produces("application/json")
282 @Path("/transaction/{topic}")
283 public void pushEventsWithTransaction(@PathParam("topic") String topic,
284 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
286 eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
288 Utils.getFormattedDate(new Date()));
289 } catch ( TopicExistsException e) {
290 log.error("Error while publishing to topic [" + topic + "].", e);
292 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
293 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
294 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
295 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
296 log.info(errRes.toString());
297 throw new CambriaApiException(errRes);
298 } catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
299 log.error("Error while publishing to topic [" + topic + "].", e);
301 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
302 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
303 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
304 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
305 log.info(errRes.toString());
306 throw new CambriaApiException(errRes);
307 } catch (ConfigDbException | IOException | missingReqdSetting e) {
308 log.error("Error while publishing to topic : " + topic, e);
310 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
311 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
312 + errorMessages.getPublishMsgError() + e.getMessage(), null,
313 Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
314 request.getRemoteHost(),
316 log.info(errRes.toString());
317 throw new CambriaApiException(errRes);
322 * This method is used for taking Configuration Object,HttpServletRequest
323 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
326 * @return DMaaPContext object from where user can get Configuration
327 * Object,HttpServlet Object
330 private DMaaPContext getDmaapContext() {
332 DMaaPContext dmaapContext = new DMaaPContext();
333 dmaapContext.setRequest(request);
334 dmaapContext.setResponse(response);
335 dmaapContext.setConfigReader(configReader);