add test cases after the kafka 11 upgrade changes
[dmaap/messagerouter/messageservice.git] / src / main / java / com / att / nsa / dmaap / service / EventsRestService.java
index 6fbfd01..40468a3 100644 (file)
@@ -8,14 +8,14 @@
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
  *        http://www.apache.org/licenses/LICENSE-2.0
- *  
+*  
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  *  ============LICENSE_END=========================================================
- *
+ *  
  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
  *  
  *******************************************************************************/
@@ -42,25 +42,24 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Component;
 
-import com.att.nsa.cambria.CambriaApiException;
-import com.att.nsa.cambria.backends.ConsumerFactory.UnavailableException;
-import com.att.nsa.cambria.beans.DMaaPContext;
-import com.att.nsa.cambria.exception.DMaaPErrorMessages;
-import com.att.nsa.cambria.exception.DMaaPResponseCode;
-import com.att.nsa.cambria.exception.ErrorResponse;
-import com.att.nsa.cambria.metabroker.Broker.TopicExistsException;
-import com.att.nsa.cambria.service.EventsService;
-import com.att.nsa.cambria.utils.ConfigurationReader;
-import com.att.nsa.cambria.utils.Utils;
+import com.att.dmf.mr.CambriaApiException;
+import com.att.dmf.mr.backends.ConsumerFactory.UnavailableException;
+import com.att.dmf.mr.beans.DMaaPContext;
+import com.att.dmf.mr.exception.DMaaPErrorMessages;
+import com.att.dmf.mr.exception.DMaaPResponseCode;
+import com.att.dmf.mr.exception.ErrorResponse;
+import com.att.dmf.mr.metabroker.Broker.TopicExistsException;
+import com.att.dmf.mr.service.EventsService;
+import com.att.dmf.mr.utils.ConfigurationReader;
+import com.att.dmf.mr.utils.Utils;
 import com.att.nsa.configs.ConfigDbException;
 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
 import com.att.nsa.security.ReadWriteSecuredResource.AccessDeniedException;
-import com.att.nsa.cambria.exception.DMaaPAccessDeniedException;
-
+import com.att.dmf.mr.exception.DMaaPAccessDeniedException;
 /**
- * This class is a CXF REST service which acts as gateway for MR Event Service.
- * 
- * @author author
+ * This class is a CXF REST service which acts 
+ * as gateway for MR Event Service.
+ * @author rajashree.khare
  *
  */
 @Component
@@ -70,8 +69,7 @@ public class EventsRestService {
        /**
         * Logger obj
         */
-       // private Logger log =
-       // Logger.getLogger(EventsRestService.class.toString());
+       //private Logger log = Logger.getLogger(EventsRestService.class.toString());
        private static final EELFLogger log = EELFManager.getInstance().getLogger(EventsRestService.class);
        /**
         * HttpServletRequest obj
@@ -85,6 +83,7 @@ public class EventsRestService {
        @Context
        private HttpServletResponse response;
 
+
        /**
         * Config Reader
         */
@@ -98,8 +97,6 @@ public class EventsRestService {
        @Autowired
        private DMaaPErrorMessages errorMessages;
 
-       private DMaaPContext dmaapContext = new DMaaPContext();
-
        /**
         * This method is used to consume messages.Taking three parameter
         * topic,consumerGroup and consumerId .Consumer decide to which topic they
@@ -121,49 +118,120 @@ public class EventsRestService {
         */
        @GET
        @Path("/{topic}/{consumergroup}/{consumerid}")
-       public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") String consumergroup,
+       public void getEvents(@PathParam("topic") String topic, @PathParam("consumergroup") 
+       String consumergroup,
                        @PathParam("consumerid") String consumerid) throws CambriaApiException {
                // log.info("Consuming message from topic " + topic );
-               dmaapContext = getDmaapContext();
-               dmaapContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+               DMaaPContext dMaaPContext = getDmaapContext();
+               dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
 
                try {
 
-                       eventsService.getEvents(dmaapContext, topic, consumergroup, consumerid);
-               } catch (TopicExistsException e) {
+                       eventsService.getEvents(dMaaPContext, topic, consumergroup, consumerid);
+               } 
+               catch (TopicExistsException  e) {
                        log.error("Error while reading data from topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
-                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, null, null, consumerid, request.getRemoteHost());
+                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
+                                                       consumerid,
+                                       request.getRemoteHost());
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
 
-               } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+               }
+               catch (DMaaPAccessDeniedException | AccessDeniedException  e) {
                        log.error("Error while reading data from topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, null, null, consumerid, request.getRemoteHost());
+                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
+                                                       consumerid,
+                                       request.getRemoteHost());
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
 
                }
-
+               
                catch (ConfigDbException | UnavailableException | IOException e) {
                        log.error("Error while reading data from topic [" + topic + "].", e);
+               
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), errorMessages.getConsumeMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic, null, null, 
+                                                       consumerid,
+                                       request.getRemoteHost());
+                       log.info(errRes.toString());
+                       throw new CambriaApiException(errRes);
+
+               }
+       }
+       
+       
+       /**
+        * This method is used to throw an exception back to the client app if CG/CID is not passed
+        *  while consuming messages
+        */
+       @GET
+       @Path("/{topic}")
+       public void getEventsToException(@PathParam("topic") String topic) throws CambriaApiException {
+               // log.info("Consuming message from topic " + topic );
+               DMaaPContext dMaaPContext = getDmaapContext();
+               dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+               try {
+
+                       throw new TopicExistsException("Incorrect URL");
+               } 
+               catch (TopicExistsException  e) {
+                       log.error("Error while reading data from topic [" + topic + "].", e);
+
+                       ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
+                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer Group and ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
+                                       );
+                       log.info(errRes.toString());
+                       throw new CambriaApiException(errRes);
+
+               }
+               
+       }
+       
+       /**
+        * This method is used to throw an exception back to the client app if CG/CID is not passed
+        *  while consuming messages
+        */
+       @GET
+       @Path("/{topic}/{consumergroup}")
+       public void getEventsToException(@PathParam("topic") String topic, @PathParam("consumergroup") 
+       String consumergroup
+                       ) throws CambriaApiException {
+               // log.info("Consuming message from topic " + topic );
+               DMaaPContext dMaaPContext = getDmaapContext();
+               dMaaPContext.setConsumerRequestTime(Utils.getFormattedDate(new Date()));
+
+               try {
+
+                       throw new TopicExistsException("Incorrect URL");
+               } 
+               catch (TopicExistsException  e) {
+                       log.error("Error while reading data from topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getConsumeMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, null, null, consumerid, request.getRemoteHost());
+                                       DMaaPResponseCode.CONSUME_MSG_ERROR.getResponseCode(), "Incorrect url - Expects consumer ID in " + request.getRequestURI() + " from "+request.getRemoteHost()
+                                       );
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
 
                }
+               
        }
+       
+       
+       
+       
+       
+       
 
        /**
         * This method is used to publish messages.Taking two parameter topic and
@@ -189,33 +257,36 @@ public class EventsRestService {
 
                try {
                        eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
-               } catch (TopicExistsException e) {
+               } 
+               catch ( TopicExistsException  e) {
                        log.error("Error while publishing to topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+                                       Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-               } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+               }
+               catch ( DMaaPAccessDeniedException | AccessDeniedException  e) {
                        log.error("Error while publishing to topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+                                       Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
-
-               catch (ConfigDbException | IOException | missingReqdSetting e) {
+               
+               
+               catch (ConfigDbException |   IOException | missingReqdSetting e) {
                        log.error("Error while publishing to topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+                                       Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
@@ -244,37 +315,40 @@ public class EventsRestService {
                // );
 
                try {
-                       eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey,
+                       eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
+                                       partitionKey,
                                        Utils.getFormattedDate(new Date()));
-               }
-
-               catch (TopicExistsException e) {
+               } 
+               
+               catch ( TopicExistsException  e) {
                        log.error("Error while publishing to topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_CONFLICT,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+                                       Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
-               } catch (DMaaPAccessDeniedException | AccessDeniedException e) {
+               }
+               catch ( DMaaPAccessDeniedException| AccessDeniedException  e) {
                        log.error("Error while publishing to topic [" + topic + "].", e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_UNAUTHORIZED,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       errorMessages.getPublishMsgError() + e.getMessage(), null, Utils.getFormattedDate(new Date()),
-                                       topic, Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), errorMessages.getPublishMsgError()
+                                                       + e.getMessage(), null, Utils.getFormattedDate(new Date()), topic,
+                                       Utils.getUserApiKey(request), request.getRemoteHost(), null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
                }
-
-               catch (ConfigDbException | IOException | missingReqdSetting e) {
+               
+               catch (ConfigDbException  | IOException | missingReqdSetting  e) {
                        log.error("Error while publishing to topic : " + topic, e);
 
                        ErrorResponse errRes = new ErrorResponse(HttpStatus.SC_NOT_FOUND,
-                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(),
-                                       "Transaction-" + errorMessages.getPublishMsgError() + e.getMessage(), null,
-                                       Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), request.getRemoteHost(),
+                                       DMaaPResponseCode.PUBLISH_MSG_ERROR.getResponseCode(), "Transaction-"
+                                                       + errorMessages.getPublishMsgError() + e.getMessage(), null,
+                                       Utils.getFormattedDate(new Date()), topic, Utils.getUserApiKey(request), 
+                                       request.getRemoteHost(),
                                        null, null);
                        log.info(errRes.toString());
                        throw new CambriaApiException(errRes);
@@ -293,6 +367,7 @@ public class EventsRestService {
         */
        private DMaaPContext getDmaapContext() {
 
+               DMaaPContext dmaapContext = new DMaaPContext();
                dmaapContext.setRequest(request);
                dmaapContext.setResponse(response);
                dmaapContext.setConfigReader(configReader);