Revert package name changes
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / service / EventsRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.dmaap.service;
23
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.util.Date;
27
28 import javax.servlet.http.HttpServletRequest;
29 import javax.servlet.http.HttpServletResponse;
30 import javax.ws.rs.GET;
31 import javax.ws.rs.POST;
32 import javax.ws.rs.Path;
33 import javax.ws.rs.PathParam;
34 import javax.ws.rs.Produces;
35 import javax.ws.rs.QueryParam;
36 import javax.ws.rs.core.Context;
37
38 import org.apache.http.HttpStatus;
39 import com.att.eelf.configuration.EELFLogger;
40 import com.att.eelf.configuration.EELFManager;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.beans.factory.annotation.Qualifier;
43 import org.springframework.stereotype.Component;
44
45 import com.att.nsa.cambria.CambriaApiException;
46 import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
47 import com.att.nsa.cambria.beans.DMaaPContext;
48 import com.att.nsa.cambria.exception.DMaaPErrorMessages;
49 import com.att.nsa.cambria.exception.DMaaPResponseCode;
50 import com.att.nsa.cambria.exception.ErrorResponse;
51 import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
52 import com.att.nsa.cambria.service.EventsService;
53 import com.att.nsa.cambria.utils.ConfigurationReader;
54 import com.att.nsa.cambria.utils.Utils;
55 import com.att.nsa.configs.ConfigDbException;
56 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
57 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
58 import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
59 /**
60  * This class is a CXF REST service which acts 
61  * as gateway for MR Event Service.
62  * @author author
63  *
64  */
65 @Component
66 @Path("/")
67 public class EventsRestService {
68
69         /**
70          * Logger obj
71          */
72         //private Logger log = Logger.getLogger(EventsRestService.class.toString());
73         private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
74         /**
75          * HttpServletRequest obj
76          */
77         @Context
78         private HttpServletRequest request;
79
80         /**
81          * HttpServletResponse obj
82          */
83         @Context
84         private HttpServletResponse response;
85
86
87         /**
88          * Config Reader
89          */
90         @Autowired
91         @Qualifier("configurationReader")
92         private ConfigurationReader configReader;
93
94         @Autowired
95         private EventsService eventsService;
96
97         @Autowired
98         private DMaaPErrorMessages errorMessages;
99
100         /**
101          * This method is used to consume messages.Taking three parameter
102          * topic,consumerGroup and consumerId .Consumer decide to which topic they
103          * want to consume messages.In on consumer Group there might be many
104          * consumer may be present.
105          * 
106          * @param topic
107          *            specify- the topic name
108          * @param consumergroup
109          *            - specify the consumer group
110          * @param consumerid
111          *            -specify the consumer id
112          * 
113          *            handles CambriaApiException | ConfigDbException |
114          *            TopicExistsException | AccessDeniedException |
115          *            UnavailableException | IOException in try catch block
116          * @throws CambriaApiException
117          * 
118          */
119         @GET
120         @Path("/{topic}/{consumergroup}/{consumerid}")
121         public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
122         String consumergroup,
123                         @PathParam("consumerid") String consumerid) throws CambriaApiException {
124                 // log.info("Consuming message from topic " + topic );
125                 DMaaPContext dMaaPContext = getDmaapContext();
126                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
127
128                 try {
129
130                         eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
131                 } 
132                 catch (TopicExistsException  e) {
133                         log.error("Error while reading data from topic [" + topic + "].", e);
134
135                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
136                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
137                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
138                                                         consumerid,
139                                         request.getRemoteHost());
140                         log.info(errRes.toString());
141                         throw new CambriaApiException(errRes);
142
143                 }
144                 catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
145                         log.error("Error while reading data from topic [" + topic + "].", e);
146
147                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
148                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
149                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
150                                                         consumerid,
151                                         request.getRemoteHost());
152                         log.info(errRes.toString());
153                         throw new CambriaApiException(errRes);
154
155                 }
156                 
157                 catch (ConfigDbException | UnavailableException | IOException e) {
158                         log.error("Error while reading data from topic [" + topic + "].", e);
159                 
160                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
161                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
162                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
163                                                         consumerid,
164                                         request.getRemoteHost());
165                         log.info(errRes.toString());
166                         throw new CambriaApiException(errRes);
167
168                 }
169         }
170
171         /**
172          * This method is used to publish messages.Taking two parameter topic and
173          * partition.Publisher decide to which topic they want to publish message
174          * and kafka decide to which partition of topic message will send,
175          * 
176          * @param topic
177          * @param msg
178          * @param partitionKey
179          * 
180          *            handles CambriaApiException | ConfigDbException |
181          *            TopicExistsException | AccessDeniedException | IOException in
182          *            try catch block
183          * @throws CambriaApiException
184          */
185
186         @POST
187         @Produces("application/json")
188         @Path("/{topic}")
189         public void pushEvents(@PathParam("topic") String topic, InputStream msg,
190                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
191                 log.info("Publishing message to topic " + topic);
192
193                 try {
194                         eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
195                 } 
196                 catch ( TopicExistsException  e) {
197                         log.error("Error while publishing to topic [" + topic + "].", e);
198
199                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
200                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
201                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
202                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
203                         log.info(errRes.toString());
204                         throw new CambriaApiException(errRes);
205                 }
206                 catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
207                         log.error("Error while publishing to topic [" + topic + "].", e);
208
209                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
210                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
211                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
212                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
213                         log.info(errRes.toString());
214                         throw new CambriaApiException(errRes);
215                 }
216                 
217                 
218                 catch (ConfigDbException |   IOException | missingReqdSetting e) {
219                         log.error("Error while publishing to topic [" + topic + "].", e);
220
221                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
222                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
223                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
224                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
225                         log.info(errRes.toString());
226                         throw new CambriaApiException(errRes);
227                 }
228         }
229
230         /**
231          * This method is used to publish messages by passing an optional header
232          * called 'transactionId'. If the 'transactionId' is not provided in the
233          * input then a new transaction object will be created. Else the existing
234          * transaction object will be updated with the counter details.
235          * 
236          * @param topic
237          * @param partitionKey
238          * 
239          *            handles CambriaApiException | ConfigDbException |
240          *            TopicExistsException | AccessDeniedException | IOException in
241          *            try catch block
242          * @throws CambriaApiException
243          */
244         @POST
245         @Produces("application/json")
246         @Path("/transaction/{topic}")
247         public void pushEventsWithTransaction(@PathParam("topic") String topic,
248                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
249                 // log.info("Publishing message with transaction id for topic " + topic
250                 // );
251
252                 try {
253                         eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
254                                         partitionKey,
255                                         Utils.getFormattedDate(new Date()));
256                 } 
257                 
258                 catch ( TopicExistsException  e) {
259                         log.error("Error while publishing to topic [" + topic + "].", e);
260
261                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
262                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
263                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
264                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
265                         log.info(errRes.toString());
266                         throw new CambriaApiException(errRes);
267                 }
268                 catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
269                         log.error("Error while publishing to topic [" + topic + "].", e);
270
271                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
272                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
273                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
274                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
275                         log.info(errRes.toString());
276                         throw new CambriaApiException(errRes);
277                 }
278                 
279                 catch (ConfigDbException  | IOException | missingReqdSetting  e) {
280                         log.error("Error while publishing to topic : " + topic, e);
281
282                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
283                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
284                                                         + errorMessages.getPublishMsgError() + e.getMessage(), null,
285                                         Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
286                                         request.getRemoteHost(),
287                                         null, null);
288                         log.info(errRes.toString());
289                         throw new CambriaApiException(errRes);
290
291                 }
292         }
293
294         /**
295          * This method is used for taking Configuration Object,HttpServletRequest
296          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
297          * Object.
298          * 
299          * @return DMaaPContext object from where user can get Configuration
300          *         Object,HttpServlet Object
301          * 
302          */
303         private DMaaPContext getDmaapContext() {
304
305                 DMaaPContext dmaapContext = new DMaaPContext();
306                 dmaapContext.setRequest(request);
307                 dmaapContext.setResponse(response);
308                 dmaapContext.setConfigReader(configReader);
309
310                 return dmaapContext;
311         }
312
313 }