Fix for Kafka Consumer is not safe error
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / EventsRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22  package org.onap.dmaap.service;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
27
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
37
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
44
45 import org.onap.dmaap.dmf.mr.CambriaApiException;
46 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
47 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
50 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
52 import org.onap.dmaap.dmf.mr.service.EventsService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
59 /**
60  * This class is a CXF REST service which acts 
61  * as gateway for MR Event Service.
62  * @author rajashree.khare
63  *
64  */
65 @Component
66 @Path("/")
67 public class EventsRestService {
68
69         /**
70          * Logger obj
71          */
72         //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73         private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
74         /**
75          * HttpServletRequest obj
76          */
77         @Context
78         private HttpServletRequest request;
79
80         /**
81          * HttpServletResponse obj
82          */
83         @Context
84         private HttpServletResponse response;
85
86
87         /**
88          * Config Reader
89          */
90         @Autowired
91         @Qualifier("configurationReader")
92         private ConfigurationReader configReader;
93
94         @Autowired
95         private EventsService eventsService;
96
97         @Autowired
98         private DMaaPErrorMessages errorMessages;
99         
100         private boolean isOffsetTopicCreated=false;
101
102         /**
103          * This method is used to consume messages.Taking three parameter
104          * topic,consumerGroup and consumerId .Consumer decide to which topic they
105          * want to consume messages.In on consumer Group there might be many
106          * consumer may be present.
107          * 
108          * @param topic
109          *            specify- the topic name
110          * @param consumergroup
111          *            - specify the consumer group
112          * @param consumerid
113          *            -specify the consumer id
114          * 
115          *            handles CambriaApiException | ConfigDbException |
116          *            TopicExistsException | AccessDeniedException |
117          *            UnavailableException | IOException in try catch block
118          * @throws CambriaApiException
119          * 
120          */
121         @GET
122         @Path("/{topic}/{consumergroup}/{consumerid}")
123         public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
124         String consumergroup,
125                         @PathParam("consumerid") String consumerid) throws CambriaApiException {
126                 // log.info("Consuming message from topic " + topic );
127                 DMaaPContext dMaaPContext = getDmaapContext();
128                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
129
130                 try {
131
132                         eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
133                 } 
134                 catch (TopicExistsException  e) {
135                         log.error("Error while reading data from topic [" + topic + "].", e);
136
137                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
138                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
139                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
140                                                         consumerid,
141                                         request.getRemoteHost());
142                         log.info(errRes.toString());
143                         throw new CambriaApiException(errRes);
144
145                 }
146                 catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
147                         log.error("Error while reading data from topic [" + topic + "].", e);
148
149                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
150                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
151                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
152                                                         consumerid,
153                                         request.getRemoteHost());
154                         log.info(errRes.toString());
155                         throw new CambriaApiException(errRes);
156
157                 }
158                 
159                 catch (ConfigDbException | UnavailableException | IOException e) {
160                         log.error("Error while reading data from topic [" + topic + "].", e);
161                 
162                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
163                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
164                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
165                                                         consumerid,
166                                         request.getRemoteHost());
167                         log.info(errRes.toString());
168                         throw new CambriaApiException(errRes);
169
170                 }
171         }
172         
173         
174         /**
175          * This method is used to throw an exception back to the client app if CG/CID is not passed
176          *  while consuming messages
177          */
178         @GET
179         @Path("/{topic}")
180         public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
181                 // log.info("Consuming message from topic " + topic );
182                 DMaaPContext dMaaPContext = getDmaapContext();
183                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
184
185                 try {
186
187                         throw new TopicExistsException("Incorrect URL");
188                 } 
189                 catch (TopicExistsException  e) {
190                         log.error("Error while reading data from topic [" + topic + "].", e);
191
192                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
193                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
194                                         );
195                         log.info(errRes.toString());
196                         throw new CambriaApiException(errRes);
197
198                 }
199                 
200         }
201         
202         /**
203          * This method is used to throw an exception back to the client app if CG/CID is not passed
204          *  while consuming messages
205          */
206         @GET
207         @Path("/{topic}/{consumergroup}")
208         public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") 
209         String consumergroup
210                         ) throws CambriaApiException {
211                 // log.info("Consuming message from topic " + topic );
212                 DMaaPContext dMaaPContext = getDmaapContext();
213                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
214
215                 try {
216
217                         throw new TopicExistsException("Incorrect URL");
218                 } 
219                 catch (TopicExistsException  e) {
220                         log.error("Error while reading data from topic [" + topic + "].", e);
221
222                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
223                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
224                                         );
225                         log.info(errRes.toString());
226                         throw new CambriaApiException(errRes);
227
228                 }
229                 
230         }
231         
232         
233         
234         
235         
236         
237
238         /**
239          * This method is used to publish messages.Taking two parameter topic and
240          * partition.Publisher decide to which topic they want to publish message
241          * and kafka decide to which partition of topic message will send,
242          * 
243          * @param topic
244          * @param msg
245          * @param partitionKey
246          * 
247          *            handles CambriaApiException | ConfigDbException |
248          *            TopicExistsException | AccessDeniedException | IOException in
249          *            try catch block
250          * @throws CambriaApiException
251          */
252
253         @POST
254         @Produces("application/json")
255         @Path("/{topic}")
256         public void pushEvents(@PathParam("topic") String topic, InputStream msg,
257                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
258                 log.info("Publishing message to topic " + topic);
259             
260                if(!isOffsetTopicCreated){
261                    preCreateOffsetTopic(msg);   
262                }
263                 try {
264                         eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
265                 } 
266                 catch ( TopicExistsException  e) {
267                         log.error("Error while publishing to topic [" + topic + "].", e);
268
269                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
270                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
271                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
272                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
273                         log.info(errRes.toString());
274                         throw new CambriaApiException(errRes);
275                 }
276                 catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
277                         log.error("Error while publishing to topic [" + topic + "].", e);
278
279                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
280                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
281                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
282                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
283                         log.info(errRes.toString());
284                         throw new CambriaApiException(errRes);
285                 }
286                 
287                 
288                 catch (ConfigDbException |   IOException | missingReqdSetting e) {
289                         log.error("Error while publishing to topic [" + topic + "].", e);
290
291                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
292                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
293                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
294                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
295                         log.info(errRes.toString());
296                         throw new CambriaApiException(errRes);
297                 }
298         }
299
300         /**
301          * This method is used to publish messages by passing an optional header
302          * called 'transactionId'. If the 'transactionId' is not provided in the
303          * input then a new transaction object will be created. Else the existing
304          * transaction object will be updated with the counter details.
305          * 
306          * @param topic
307          * @param partitionKey
308          * 
309          *            handles CambriaApiException | ConfigDbException |
310          *            TopicExistsException | AccessDeniedException | IOException in
311          *            try catch block
312          * @throws CambriaApiException
313          */
314         @POST
315         @Produces("application/json")
316         @Path("/transaction/{topic}")
317         public void pushEventsWithTransaction(@PathParam("topic") String topic,
318                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
319                 // log.info("Publishing message with transaction id for topic " + topic
320                 // );
321                 
322
323                 try {
324                         
325                         if(!isOffsetTopicCreated){
326                         preCreateOffsetTopic(request.getInputStream());   
327                       }
328                         
329                         eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
330                                         partitionKey,
331                                         Utils.getFormattedDate(new Date()));
332                 } 
333                 
334                 catch ( TopicExistsException  e) {
335                         log.error("Error while publishing to topic [" + topic + "].", e);
336
337                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
338                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
339                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
340                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
341                         log.info(errRes.toString());
342                         throw new CambriaApiException(errRes);
343                 }
344                 catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
345                         log.error("Error while publishing to topic [" + topic + "].", e);
346
347                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
348                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
349                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
350                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
351                         log.info(errRes.toString());
352                         throw new CambriaApiException(errRes);
353                 }
354                 
355                 catch (ConfigDbException  | IOException | missingReqdSetting  e) {
356                         log.error("Error while publishing to topic : " + topic, e);
357
358                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
359                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
360                                                         + errorMessages.getPublishMsgError() + e.getMessage(), null,
361                                         Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
362                                         request.getRemoteHost(),
363                                         null, null);
364                         log.info(errRes.toString());
365                         throw new CambriaApiException(errRes);
366
367                 }
368         }
369
370         /**
371          * This method is used for taking Configuration Object,HttpServletRequest
372          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
373          * Object.
374          * 
375          * @return DMaaPContext object from where user can get Configuration
376          *         Object,HttpServlet Object
377          * 
378          */
379         private DMaaPContext getDmaapContext() {
380
381                 DMaaPContext dmaapContext = new DMaaPContext();
382                 dmaapContext.setRequest(request);
383                 dmaapContext.setResponse(response);
384                 dmaapContext.setConfigReader(configReader);
385
386                 return dmaapContext;
387         }
388         
389         private void preCreateOffsetTopic(InputStream msg) {
390
391                 try {
392                         eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
393                         eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
394                         isOffsetTopicCreated = true;
395                 } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
396                                 | missingReqdSetting | UnavailableException e) {
397                         log.error("Error while creating the dummy topic", e);
398                 }
399
400         }
401
402 }