1 /*******************************************************************************
2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 *******************************************************************************/
22 package org.onap.dmaap.service;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
45 import org.onap.dmaap.dmf.mr.CambriaApiException;
46 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
47 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
50 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
52 import org.onap.dmaap.dmf.mr.service.EventsService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
60 * This class is a CXF REST service which acts
61 * as gateway for MR Event Service.
62 * @author rajashree.khare
67 public class EventsRestService {
72 //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73 private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
75 * HttpServletRequest obj
78 private HttpServletRequest request;
81 * HttpServletResponse obj
84 private HttpServletResponse response;
91 @Qualifier("configurationReader")
92 private ConfigurationReader configReader;
95 private EventsService eventsService;
98 private DMaaPErrorMessages errorMessages;
100 private boolean isOffsetTopicCreated=false;
103 * This method is used to consume messages.Taking three parameter
104 * topic,consumerGroup and consumerId .Consumer decide to which topic they
105 * want to consume messages.In on consumer Group there might be many
106 * consumer may be present.
109 * specify- the topic name
110 * @param consumergroup
111 * - specify the consumer group
113 * -specify the consumer id
115 * handles CambriaApiException | ConfigDbException |
116 * TopicExistsException | AccessDeniedException |
117 * UnavailableException | IOException in try catch block
118 * @throws CambriaApiException
122 @Path("/{topic}/{consumergroup}/{consumerid}")
123 public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup")
124 String consumergroup,
125 @PathParam("consumerid") String consumerid) throws CambriaApiException {
126 // log.info("Consuming message from topic " + topic );
127 DMaaPContext dMaaPContext = getDmaapContext();
128 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
132 eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
134 catch (TopicExistsException e) {
135 log.error("Error while reading data from topic [" + topic + "].", e);
137 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
138 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
139 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
141 request.getRemoteHost());
142 log.info(errRes.toString());
143 throw new CambriaApiException(errRes);
146 catch (DMaaPAccessDeniedException | AccessDeniedException e) {
147 log.error("Error while reading data from topic [" + topic + "].", e);
149 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
150 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
151 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
153 request.getRemoteHost());
154 log.info(errRes.toString());
155 throw new CambriaApiException(errRes);
159 catch (ConfigDbException | UnavailableException | IOException e) {
160 log.error("Error while reading data from topic [" + topic + "].", e);
162 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
163 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
164 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null,
166 request.getRemoteHost());
167 log.info(errRes.toString());
168 throw new CambriaApiException(errRes);
175 * This method is used to throw an exception back to the client app if CG/CID is not passed
176 * while consuming messages
180 public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
181 // log.info("Consuming message from topic " + topic );
182 DMaaPContext dMaaPContext = getDmaapContext();
183 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
187 throw new TopicExistsException("Incorrect URL");
189 catch (TopicExistsException e) {
190 log.error("Error while reading data from topic [" + topic + "].", e);
192 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
193 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
195 log.info(errRes.toString());
196 throw new CambriaApiException(errRes);
203 * This method is used to throw an exception back to the client app if CG/CID is not passed
204 * while consuming messages
207 @Path("/{topic}/{consumergroup}")
208 public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup")
210 ) throws CambriaApiException {
211 // log.info("Consuming message from topic " + topic );
212 DMaaPContext dMaaPContext = getDmaapContext();
213 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
217 throw new TopicExistsException("Incorrect URL");
219 catch (TopicExistsException e) {
220 log.error("Error while reading data from topic [" + topic + "].", e);
222 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
223 DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
225 log.info(errRes.toString());
226 throw new CambriaApiException(errRes);
239 * This method is used to publish messages.Taking two parameter topic and
240 * partition.Publisher decide to which topic they want to publish message
241 * and kafka decide to which partition of topic message will send,
245 * @param partitionKey
247 * handles CambriaApiException | ConfigDbException |
248 * TopicExistsException | AccessDeniedException | IOException in
250 * @throws CambriaApiException
254 @Produces("application/json")
256 public void pushEvents(@PathParam("topic") String topic, InputStream msg,
257 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
258 log.info("Publishing message to topic " + topic);
260 if(!isOffsetTopicCreated){
261 preCreateOffsetTopic(msg);
264 eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
266 catch ( TopicExistsException e) {
267 log.error("Error while publishing to topic [" + topic + "].", e);
269 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
270 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
271 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
272 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
273 log.info(errRes.toString());
274 throw new CambriaApiException(errRes);
276 catch ( DMaaPAccessDeniedException | AccessDeniedException e) {
277 log.error("Error while publishing to topic [" + topic + "].", e);
279 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
280 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
281 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
282 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
283 log.info(errRes.toString());
284 throw new CambriaApiException(errRes);
288 catch (ConfigDbException | IOException | missingReqdSetting e) {
289 log.error("Error while publishing to topic [" + topic + "].", e);
291 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
292 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
293 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
294 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
295 log.info(errRes.toString());
296 throw new CambriaApiException(errRes);
301 * This method is used to publish messages by passing an optional header
302 * called 'transactionId'. If the 'transactionId' is not provided in the
303 * input then a new transaction object will be created. Else the existing
304 * transaction object will be updated with the counter details.
307 * @param partitionKey
309 * handles CambriaApiException | ConfigDbException |
310 * TopicExistsException | AccessDeniedException | IOException in
312 * @throws CambriaApiException
315 @Produces("application/json")
316 @Path("/transaction/{topic}")
317 public void pushEventsWithTransaction(@PathParam("topic") String topic,
318 @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
319 // log.info("Publishing message with transaction id for topic " + topic
325 if(!isOffsetTopicCreated){
326 preCreateOffsetTopic(request.getInputStream());
329 eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
331 Utils.getFormattedDate(new Date()));
334 catch ( TopicExistsException e) {
335 log.error("Error while publishing to topic [" + topic + "].", e);
337 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
338 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
339 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
340 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
341 log.info(errRes.toString());
342 throw new CambriaApiException(errRes);
344 catch ( DMaaPAccessDeniedException| AccessDeniedException e) {
345 log.error("Error while publishing to topic [" + topic + "].", e);
347 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
348 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
349 + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
350 Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
351 log.info(errRes.toString());
352 throw new CambriaApiException(errRes);
355 catch (ConfigDbException | IOException | missingReqdSetting e) {
356 log.error("Error while publishing to topic : " + topic, e);
358 ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
359 DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
360 + errorMessages.getPublishMsgError() + e.getMessage(), null,
361 Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request),
362 request.getRemoteHost(),
364 log.info(errRes.toString());
365 throw new CambriaApiException(errRes);
371 * This method is used for taking Configuration Object,HttpServletRequest
372 * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
375 * @return DMaaPContext object from where user can get Configuration
376 * Object,HttpServlet Object
379 private DMaaPContext getDmaapContext() {
381 DMaaPContext dmaapContext = new DMaaPContext();
382 dmaapContext.setRequest(request);
383 dmaapContext.setResponse(response);
384 dmaapContext.setConfigReader(configReader);
389 private void preCreateOffsetTopic(InputStream msg) {
392 eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
393 eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
394 isOffsetTopicCreated = true;
395 } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
396 | missingReqdSetting | UnavailableException e) {
397 log.error("Error while creating the dummy topic", e);