update the package name
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / EventsRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22  package org.onap.dmaap.service;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
27
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
37
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
44
45 import org.onap.dmaap.dmf.mr.CambriaApiException;
46 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
47 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
48 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
50 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
51 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
52 import org.onap.dmaap.dmf.mr.service.EventsService;
53 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
54 import org.onap.dmaap.dmf.mr.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
59 /**
60  * This class is a CXF REST service which acts 
61  * as gateway for MR Event Service.
62  * @author rajashree.khare
63  *
64  */
65 @Component
66 @Path("/")
67 public class EventsRestService {
68
69         /**
70          * Logger obj
71          */
72         //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73         private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
74         /**
75          * HttpServletRequest obj
76          */
77         @Context
78         private HttpServletRequest request;
79
80         /**
81          * HttpServletResponse obj
82          */
83         @Context
84         private HttpServletResponse response;
85
86
87         /**
88          * Config Reader
89          */
90         @Autowired
91         @Qualifier("configurationReader")
92         private ConfigurationReader configReader;
93
94         @Autowired
95         private EventsService eventsService;
96
97         @Autowired
98         private DMaaPErrorMessages errorMessages;
99
100         /**
101          * This method is used to consume messages.Taking three parameter
102          * topic,consumerGroup and consumerId .Consumer decide to which topic they
103          * want to consume messages.In on consumer Group there might be many
104          * consumer may be present.
105          * 
106          * @param topic
107          *            specify- the topic name
108          * @param consumergroup
109          *            - specify the consumer group
110          * @param consumerid
111          *            -specify the consumer id
112          * 
113          *            handles CambriaApiException | ConfigDbException |
114          *            TopicExistsException | AccessDeniedException |
115          *            UnavailableException | IOException in try catch block
116          * @throws CambriaApiException
117          * 
118          */
119         @GET
120         @Path("/{topic}/{consumergroup}/{consumerid}")
121         public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
122         String consumergroup,
123                         @PathParam("consumerid") String consumerid) throws CambriaApiException {
124                 // log.info("Consuming message from topic " + topic );
125                 DMaaPContext dMaaPContext = getDmaapContext();
126                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
127
128                 try {
129
130                         eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
131                 } 
132                 catch (TopicExistsException  e) {
133                         log.error("Error while reading data from topic [" + topic + "].", e);
134
135                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
136                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
137                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
138                                                         consumerid,
139                                         request.getRemoteHost());
140                         log.info(errRes.toString());
141                         throw new CambriaApiException(errRes);
142
143                 }
144                 catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
145                         log.error("Error while reading data from topic [" + topic + "].", e);
146
147                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
148                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
149                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
150                                                         consumerid,
151                                         request.getRemoteHost());
152                         log.info(errRes.toString());
153                         throw new CambriaApiException(errRes);
154
155                 }
156                 
157                 catch (ConfigDbException | UnavailableException | IOException e) {
158                         log.error("Error while reading data from topic [" + topic + "].", e);
159                 
160                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
161                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
162                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
163                                                         consumerid,
164                                         request.getRemoteHost());
165                         log.info(errRes.toString());
166                         throw new CambriaApiException(errRes);
167
168                 }
169         }
170         
171         
172         /**
173          * This method is used to throw an exception back to the client app if CG/CID is not passed
174          *  while consuming messages
175          */
176         @GET
177         @Path("/{topic}")
178         public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
179                 // log.info("Consuming message from topic " + topic );
180                 DMaaPContext dMaaPContext = getDmaapContext();
181                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
182
183                 try {
184
185                         throw new TopicExistsException("Incorrect URL");
186                 } 
187                 catch (TopicExistsException  e) {
188                         log.error("Error while reading data from topic [" + topic + "].", e);
189
190                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
191                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
192                                         );
193                         log.info(errRes.toString());
194                         throw new CambriaApiException(errRes);
195
196                 }
197                 
198         }
199         
200         /**
201          * This method is used to throw an exception back to the client app if CG/CID is not passed
202          *  while consuming messages
203          */
204         @GET
205         @Path("/{topic}/{consumergroup}")
206         public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") 
207         String consumergroup
208                         ) throws CambriaApiException {
209                 // log.info("Consuming message from topic " + topic );
210                 DMaaPContext dMaaPContext = getDmaapContext();
211                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
212
213                 try {
214
215                         throw new TopicExistsException("Incorrect URL");
216                 } 
217                 catch (TopicExistsException  e) {
218                         log.error("Error while reading data from topic [" + topic + "].", e);
219
220                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
221                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
222                                         );
223                         log.info(errRes.toString());
224                         throw new CambriaApiException(errRes);
225
226                 }
227                 
228         }
229         
230         
231         
232         
233         
234         
235
236         /**
237          * This method is used to publish messages.Taking two parameter topic and
238          * partition.Publisher decide to which topic they want to publish message
239          * and kafka decide to which partition of topic message will send,
240          * 
241          * @param topic
242          * @param msg
243          * @param partitionKey
244          * 
245          *            handles CambriaApiException | ConfigDbException |
246          *            TopicExistsException | AccessDeniedException | IOException in
247          *            try catch block
248          * @throws CambriaApiException
249          */
250
251         @POST
252         @Produces("application/json")
253         @Path("/{topic}")
254         public void pushEvents(@PathParam("topic") String topic, InputStream msg,
255                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
256                 log.info("Publishing message to topic " + topic);
257
258                 try {
259                         eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
260                 } 
261                 catch ( TopicExistsException  e) {
262                         log.error("Error while publishing to topic [" + topic + "].", e);
263
264                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
265                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
266                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
267                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
268                         log.info(errRes.toString());
269                         throw new CambriaApiException(errRes);
270                 }
271                 catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
272                         log.error("Error while publishing to topic [" + topic + "].", e);
273
274                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
275                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
276                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
277                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
278                         log.info(errRes.toString());
279                         throw new CambriaApiException(errRes);
280                 }
281                 
282                 
283                 catch (ConfigDbException |   IOException | missingReqdSetting e) {
284                         log.error("Error while publishing to topic [" + topic + "].", e);
285
286                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
287                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
288                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
289                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
290                         log.info(errRes.toString());
291                         throw new CambriaApiException(errRes);
292                 }
293         }
294
295         /**
296          * This method is used to publish messages by passing an optional header
297          * called 'transactionId'. If the 'transactionId' is not provided in the
298          * input then a new transaction object will be created. Else the existing
299          * transaction object will be updated with the counter details.
300          * 
301          * @param topic
302          * @param partitionKey
303          * 
304          *            handles CambriaApiException | ConfigDbException |
305          *            TopicExistsException | AccessDeniedException | IOException in
306          *            try catch block
307          * @throws CambriaApiException
308          */
309         @POST
310         @Produces("application/json")
311         @Path("/transaction/{topic}")
312         public void pushEventsWithTransaction(@PathParam("topic") String topic,
313                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
314                 // log.info("Publishing message with transaction id for topic " + topic
315                 // );
316
317                 try {
318                         eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
319                                         partitionKey,
320                                         Utils.getFormattedDate(new Date()));
321                 } 
322                 
323                 catch ( TopicExistsException  e) {
324                         log.error("Error while publishing to topic [" + topic + "].", e);
325
326                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
327                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
328                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
329                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
330                         log.info(errRes.toString());
331                         throw new CambriaApiException(errRes);
332                 }
333                 catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
334                         log.error("Error while publishing to topic [" + topic + "].", e);
335
336                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
337                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
338                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
339                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
340                         log.info(errRes.toString());
341                         throw new CambriaApiException(errRes);
342                 }
343                 
344                 catch (ConfigDbException  | IOException | missingReqdSetting  e) {
345                         log.error("Error while publishing to topic : " + topic, e);
346
347                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
348                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
349                                                         + errorMessages.getPublishMsgError() + e.getMessage(), null,
350                                         Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
351                                         request.getRemoteHost(),
352                                         null, null);
353                         log.info(errRes.toString());
354                         throw new CambriaApiException(errRes);
355
356                 }
357         }
358
359         /**
360          * This method is used for taking Configuration Object,HttpServletRequest
361          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
362          * Object.
363          * 
364          * @return DMaaPContext object from where user can get Configuration
365          *         Object,HttpServlet Object
366          * 
367          */
368         private DMaaPContext getDmaapContext() {
369
370                 DMaaPContext dmaapContext = new DMaaPContext();
371                 dmaapContext.setRequest(request);
372                 dmaapContext.setResponse(response);
373                 dmaapContext.setConfigReader(configReader);
374
375                 return dmaapContext;
376         }
377
378 }