d3abd6b1484f469a958e4b0e9d4c19b906b7f388
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / EventsRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22  package org.onap.dmaap.service;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.Date;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.GET;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.QueryParam;
37 import javax.ws.rs.core.Context;
38
39 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
45
46 import org.onap.dmaap.dmf.mr.CambriaApiException;
47 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
48 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
50 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
51 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
52 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
53 import org.onap.dmaap.dmf.mr.service.EventsService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
58 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
59 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
60 /**
61  * This class is a CXF REST service which acts 
62  * as gateway for MR Event Service.
63  * @author rajashree.khare
64  *
65  */
66 @Component
67 @Path("/")
68 public class EventsRestService {
69
70         /**
71          * Logger obj
72          */
73         //private Logger log = Logger.getLogger(EventsRestService.class.toString());
74         private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
75         /**
76          * HttpServletRequest obj
77          */
78         @Context
79         private HttpServletRequest request;
80
81         /**
82          * HttpServletResponse obj
83          */
84         @Context
85         private HttpServletResponse response;
86
87
88         /**
89          * Config Reader
90          */
91         @Autowired
92         @Qualifier("configurationReader")
93         private ConfigurationReader configReader;
94
95         @Autowired
96         private EventsService eventsService;
97
98         @Autowired
99         private DMaaPErrorMessages errorMessages;
100         
101
102         /**
103          * This method is used to consume messages.Taking three parameter
104          * topic,consumerGroup and consumerId .Consumer decide to which topic they
105          * want to consume messages.In on consumer Group there might be many
106          * consumer may be present.
107          * 
108          * @param topic
109          *            specify- the topic name
110          * @param consumergroup
111          *            - specify the consumer group
112          * @param consumerid
113          *            -specify the consumer id
114          * 
115          *            handles CambriaApiException | ConfigDbException |
116          *            TopicExistsException | AccessDeniedException |
117          *            UnavailableException | IOException in try catch block
118          * @throws CambriaApiException
119          * 
120          */
121         @GET
122         @Path("/{topic}/{consumergroup}/{consumerid}")
123         public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
124         String consumergroup,
125                         @PathParam("consumerid") String consumerid) throws CambriaApiException {
126                 // log.info("Consuming message from topic " + topic );
127                 DMaaPContext dMaaPContext = getDmaapContext();
128                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
129
130                 try {
131
132                         eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
133                 } 
134                 catch (TopicExistsException  e) {
135                         log.error("Error while reading data from topic [" + topic + "].", e);
136
137                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
138                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
139                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
140                                                         consumerid,
141                                         request.getRemoteHost());
142                         log.info(errRes.toString());
143                         throw new CambriaApiException(errRes);
144
145                 }
146                 catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
147                         log.error("Error while reading data from topic [" + topic + "].", e);
148
149                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
150                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
151                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
152                                                         consumerid,
153                                         request.getRemoteHost());
154                         log.info(errRes.toString());
155                         throw new CambriaApiException(errRes);
156
157                 }
158                 
159                 catch (ConfigDbException | UnavailableException | IOException e) {
160                         log.error("Error while reading data from topic [" + topic + "].", e);
161                 
162                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
163                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
164                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
165                                                         consumerid,
166                                         request.getRemoteHost());
167                         log.info(errRes.toString());
168                         throw new CambriaApiException(errRes);
169
170                 }
171         }
172         
173         
174         /**
175          * This method is used to throw an exception back to the client app if CG/CID is not passed
176          *  while consuming messages
177          */
178         @GET
179         @Path("/{topic}")
180         public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
181                 // log.info("Consuming message from topic " + topic );
182                 DMaaPContext dMaaPContext = getDmaapContext();
183                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
184
185                 try {
186
187                         throw new TopicExistsException("Incorrect URL");
188                 } 
189                 catch (TopicExistsException  e) {
190                         log.error("Error while reading data from topic [" + topic + "].", e);
191
192                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
193                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
194                                         );
195                         log.info(errRes.toString());
196                         throw new CambriaApiException(errRes);
197
198                 }
199                 
200         }
201         
202         /**
203          * This method is used to throw an exception back to the client app if CG/CID is not passed
204          *  while consuming messages
205          */
206         @GET
207         @Path("/{topic}/{consumergroup}")
208         public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") 
209         String consumergroup
210                         ) throws CambriaApiException {
211                 // log.info("Consuming message from topic " + topic );
212                 DMaaPContext dMaaPContext = getDmaapContext();
213                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
214
215                 try {
216
217                         throw new TopicExistsException("Incorrect URL");
218                 } 
219                 catch (TopicExistsException  e) {
220                         log.error("Error while reading data from topic [" + topic + "].", e);
221
222                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
223                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
224                                         );
225                         log.info(errRes.toString());
226                         throw new CambriaApiException(errRes);
227
228                 }
229                 
230         }
231         
232         
233         
234         
235         
236         
237
238         /**
239          * This method is used to publish messages.Taking two parameter topic and
240          * partition.Publisher decide to which topic they want to publish message
241          * and kafka decide to which partition of topic message will send,
242          * 
243          * @param topic
244          * @param msg
245          * @param partitionKey
246          * 
247          *            handles CambriaApiException | ConfigDbException |
248          *            TopicExistsException | AccessDeniedException | IOException in
249          *            try catch block
250          * @throws CambriaApiException
251          */
252
253         @POST
254         @Produces("application/json")
255         @Path("/{topic}")
256         public void pushEvents(@PathParam("topic") String topic, InputStream msg,
257                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
258                 log.info("Publishing message to topic " + topic);
259             
260                 try {
261                         eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
262                 } 
263                 catch ( TopicExistsException  e) {
264                         log.error("Error while publishing to topic [" + topic + "].", e);
265
266                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
267                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
268                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
269                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
270                         log.info(errRes.toString());
271                         throw new CambriaApiException(errRes);
272                 }
273                 catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
274                         log.error("Error while publishing to topic [" + topic + "].", e);
275
276                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
277                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
278                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
279                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
280                         log.info(errRes.toString());
281                         throw new CambriaApiException(errRes);
282                 }
283                 
284                 
285                 catch (ConfigDbException |   IOException | missingReqdSetting e) {
286                         log.error("Error while publishing to topic [" + topic + "].", e);
287
288                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
289                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
290                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
291                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
292                         log.info(errRes.toString());
293                         throw new CambriaApiException(errRes);
294                 }
295         }
296
297         /**
298          * This method is used to publish messages by passing an optional header
299          * called 'transactionId'. If the 'transactionId' is not provided in the
300          * input then a new transaction object will be created. Else the existing
301          * transaction object will be updated with the counter details.
302          * 
303          * @param topic
304          * @param partitionKey
305          * 
306          *            handles CambriaApiException | ConfigDbException |
307          *            TopicExistsException | AccessDeniedException | IOException in
308          *            try catch block
309          * @throws CambriaApiException
310          */
311         @POST
312         @Produces("application/json")
313         @Path("/transaction/{topic}")
314         public void pushEventsWithTransaction(@PathParam("topic") String topic,
315                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
316                 // log.info("Publishing message with transaction id for topic " + topic
317                 // );
318                 
319
320                 try {
321                         
322                         eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
323                                         partitionKey,
324                                         Utils.getFormattedDate(new Date()));
325                 } 
326                 
327                 catch ( TopicExistsException  e) {
328                         log.error("Error while publishing to topic [" + topic + "].", e);
329
330                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
331                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
332                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
333                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
334                         log.info(errRes.toString());
335                         throw new CambriaApiException(errRes);
336                 }
337                 catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
338                         log.error("Error while publishing to topic [" + topic + "].", e);
339
340                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
341                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
342                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
343                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
344                         log.info(errRes.toString());
345                         throw new CambriaApiException(errRes);
346                 }
347                 
348                 catch (ConfigDbException  | IOException | missingReqdSetting  e) {
349                         log.error("Error while publishing to topic : " + topic, e);
350
351                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
352                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
353                                                         + errorMessages.getPublishMsgError() + e.getMessage(), null,
354                                         Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
355                                         request.getRemoteHost(),
356                                         null, null);
357                         log.info(errRes.toString());
358                         throw new CambriaApiException(errRes);
359
360                 }
361         }
362
363         /**
364          * This method is used for taking Configuration Object,HttpServletRequest
365          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
366          * Object.
367          * 
368          * @return DMaaPContext object from where user can get Configuration
369          *         Object,HttpServlet Object
370          * 
371          */
372         private DMaaPContext getDmaapContext() {
373
374                 DMaaPContext dmaapContext = new DMaaPContext();
375                 dmaapContext.setRequest(request);
376                 dmaapContext.setResponse(response);
377                 dmaapContext.setConfigReader(configReader);
378
379                 return dmaapContext;
380         }
381         
382
383 }