[DMAAP-MR] Remove acl check on topics
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / EventsRestService.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22  package org.onap.dmaap.service;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.util.Date;
28
29 import javax.servlet.http.HttpServletRequest;
30 import javax.servlet.http.HttpServletResponse;
31 import javax.ws.rs.GET;
32 import javax.ws.rs.POST;
33 import javax.ws.rs.Path;
34 import javax.ws.rs.PathParam;
35 import javax.ws.rs.Produces;
36 import javax.ws.rs.QueryParam;
37 import javax.ws.rs.core.Context;
38
39 import org.apache.http.HttpStatus;
40 import com.att.eelf.configuration.EELFLogger;
41 import com.att.eelf.configuration.EELFManager;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.beans.factory.annotation.Qualifier;
44 import org.springframework.stereotype.Component;
45
46 import org.onap.dmaap.dmf.mr.CambriaApiException;
47 import org.onap.dmaap.dmf.mr.backends.ConsumerFactory.UnavailableException;
48 import org.onap.dmaap.dmf.mr.beans.DMaaPContext;
49 import org.onap.dmaap.dmf.mr.exception.DMaaPErrorMessages;
50 import org.onap.dmaap.dmf.mr.exception.DMaaPResponseCode;
51 import org.onap.dmaap.dmf.mr.exception.ErrorResponse;
52 import org.onap.dmaap.dmf.mr.metabroker.Broker.TopicExistsException;
53 import org.onap.dmaap.dmf.mr.service.EventsService;
54 import org.onap.dmaap.dmf.mr.utils.ConfigurationReader;
55 import org.onap.dmaap.dmf.mr.utils.Utils;
56 import com.att.nsa.configs.ConfigDbException;
57 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
58 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
59 import org.onap.dmaap.dmf.mr.exception.DMaaPAccessDeniedException;
60 /**
61  * This class is a CXF REST service which acts 
62  * as gateway for MR Event Service.
63  * @author rajashree.khare
64  *
65  */
66 @Component
67 @Path("/")
68 public class EventsRestService {
69
70         /**
71          * Logger obj
72          */
73         private static final EELFLogger log = EELFManager.getLogger(EventsRestService.class);
74         /**
75          * HttpServletRequest obj
76          */
77         @Context
78         private HttpServletRequest request;
79
80         /**
81          * HttpServletResponse obj
82          */
83         @Context
84         private HttpServletResponse response;
85
86
87         /**
88          * Config Reader
89          */
90         @Autowired
91         @Qualifier("configurationReader")
92         private ConfigurationReader configReader;
93
94         @Autowired
95         private EventsService eventsService;
96
97         @Autowired
98         private DMaaPErrorMessages errorMessages;
99         
100
101         /**
102          * This method is used to consume messages.Taking three parameter
103          * topic,consumerGroup and consumerId .Consumer decide to which topic they
104          * want to consume messages.In on consumer Group there might be many
105          * consumer may be present.
106          * 
107          * @param topic
108          *            specify- the topic name
109          * @param consumergroup
110          *            - specify the consumer group
111          * @param consumerid
112          *            -specify the consumer id
113          * 
114          *            handles CambriaApiException | ConfigDbException |
115          *            TopicExistsException | AccessDeniedException |
116          *            UnavailableException | IOException in try catch block
117          * @throws CambriaApiException
118          * 
119          */
120         @GET
121         @Path("/{topic}/{consumergroup}/{consumerid}")
122         public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
123         String consumergroup,
124                         @PathParam("consumerid") String consumerid) throws CambriaApiException {
125                 log.info("Consuming message from topic " + topic );
126                 DMaaPContext dMaaPContext = getDmaapContext();
127                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
128
129                 try {
130                         eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
131                 } catch (TopicExistsException  e) {
132                         log.error("Error while reading data from topic [" + topic + "].", e);
133
134                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
135                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
136                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
137                                                         consumerid,
138                                         request.getRemoteHost());
139                         log.info(errRes.toString());
140                         throw new CambriaApiException(errRes);
141                 } catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
142                         log.error("Error while reading data from topic [" + topic + "].", e);
143                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
144                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
145                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
146                                                         consumerid,
147                                         request.getRemoteHost());
148                         log.info(errRes.toString());
149                         throw new CambriaApiException(errRes);
150                 } catch (ConfigDbException | UnavailableException | IOException e) {
151                         log.error("Error while reading data from topic [" + topic + "].", e);
152                 
153                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
154                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
155                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
156                                                         consumerid,
157                                         request.getRemoteHost());
158                         log.info(errRes.toString());
159                         throw new CambriaApiException(errRes);
160                 }
161         }
162         
163         
164         /**
165          * This method is used to throw an exception back to the client app if CG/CID is not passed
166          *  while consuming messages
167          */
168         @GET
169         @Path("/{topic}")
170         public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
171                 // log.info("Consuming message from topic " + topic );
172                 DMaaPContext dMaaPContext = getDmaapContext();
173                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
174
175                 try {
176
177                         throw new TopicExistsException("Incorrect URL");
178                 } catch (TopicExistsException  e) {
179                         log.error("Error while reading data from topic [" + topic + "].", e);
180
181                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
182                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
183                                         );
184                         log.info(errRes.toString());
185                         throw new CambriaApiException(errRes);
186                 }
187         }
188         
189         /**
190          * This method is used to throw an exception back to the client app if CG/CID is not passed
191          *  while consuming messages
192          */
193         @GET
194         @Path("/{topic}/{consumergroup}")
195         public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") 
196         String consumergroup) throws CambriaApiException {
197                 // log.info("Consuming message from topic " + topic );
198                 DMaaPContext dMaaPContext = getDmaapContext();
199                 dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
200                 try {
201                         throw new TopicExistsException("Incorrect URL");
202                 } catch (TopicExistsException  e) {
203                         log.error("Error while reading data from topic [" + topic + "].", e);
204
205                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
206                                         DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
207                                         );
208                         log.info(errRes.toString());
209                         throw new CambriaApiException(errRes);
210                 }
211         }
212
213         /**
214          * This method is used to publish messages.Taking two parameter topic and
215          * partition.Publisher decide to which topic they want to publish message
216          * and kafka decide to which partition of topic message will send,
217          * 
218          * @param topic
219          * @param msg
220          * @param partitionKey
221          * 
222          *            handles CambriaApiException | ConfigDbException |
223          *            TopicExistsException | AccessDeniedException | IOException in
224          *            try catch block
225          * @throws CambriaApiException
226          */
227
228         @POST
229         @Produces("application/json")
230         @Path("/{topic}")
231         public void pushEvents(@PathParam("topic") String topic, InputStream msg,
232                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
233                 log.info("Publishing message to topic " + topic);
234                 try {
235                         eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
236                 } catch ( TopicExistsException  e) {
237                         log.error("Error while publishing to topic [" + topic + "].", e);
238
239                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
240                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
241                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
242                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
243                         log.info(errRes.toString());
244                         throw new CambriaApiException(errRes);
245                 } catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
246                         log.error("Error while publishing to topic [" + topic + "].", e);
247
248                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
249                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
250                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
251                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
252                         log.info(errRes.toString());
253                         throw new CambriaApiException(errRes);
254                 } catch (ConfigDbException |   IOException | missingReqdSetting e) {
255                         log.error("Error while publishing to topic [" + topic + "].", e);
256
257                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
258                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
259                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
260                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
261                         log.info(errRes.toString());
262                         throw new CambriaApiException(errRes);
263                 }
264         }
265
266         /**
267          * This method is used to publish messages by passing an optional header
268          * called 'transactionId'. If the 'transactionId' is not provided in the
269          * input then a new transaction object will be created. Else the existing
270          * transaction object will be updated with the counter details.
271          * 
272          * @param topic
273          * @param partitionKey
274          * 
275          *            handles CambriaApiException | ConfigDbException |
276          *            TopicExistsException | AccessDeniedException | IOException in
277          *            try catch block
278          * @throws CambriaApiException
279          */
280         @POST
281         @Produces("application/json")
282         @Path("/transaction/{topic}")
283         public void pushEventsWithTransaction(@PathParam("topic") String topic,
284                         @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
285                 try {
286                         eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
287                                         partitionKey,
288                                         Utils.getFormattedDate(new Date()));
289                 } catch ( TopicExistsException  e) {
290                         log.error("Error while publishing to topic [" + topic + "].", e);
291
292                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
293                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
294                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
295                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
296                         log.info(errRes.toString());
297                         throw new CambriaApiException(errRes);
298                 } catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
299                         log.error("Error while publishing to topic [" + topic + "].", e);
300
301                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
302                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
303                                                         + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
304                                         Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
305                         log.info(errRes.toString());
306                         throw new CambriaApiException(errRes);
307                 } catch (ConfigDbException  | IOException | missingReqdSetting  e) {
308                         log.error("Error while publishing to topic : " + topic, e);
309
310                         ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
311                                         DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
312                                                         + errorMessages.getPublishMsgError() + e.getMessage(), null,
313                                         Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
314                                         request.getRemoteHost(),
315                                         null, null);
316                         log.info(errRes.toString());
317                         throw new CambriaApiException(errRes);
318                 }
319         }
320
321         /**
322          * This method is used for taking Configuration Object,HttpServletRequest
323          * Object,HttpServletRequest HttpServletResponse Object,HttpServletSession
324          * Object.
325          * 
326          * @return DMaaPContext object from where user can get Configuration
327          *         Object,HttpServlet Object
328          * 
329          */
330         private DMaaPContext getDmaapContext() {
331
332                 DMaaPContext dmaapContext = new DMaaPContext();
333                 dmaapContext.setRequest(request);
334                 dmaapContext.setResponse(response);
335                 dmaapContext.setConfigReader(configReader);
336
337                 return dmaapContext;
338         }
339         
340
341 }