Fix for Kafka Consumer is not safe error
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / service / EventsRestService.java
index 2672261..b4aee10 100644 (file)
@@ -96,6 +96,8 @@ public class EventsRestService {
 
        @Autowired
        private DMaaPErrorMessages errorMessages;
+       
+       private boolean isOffsetTopicCreated=false;
 
        /**
         * This method is used to consume messages.Taking three parameter
@@ -254,7 +256,10 @@ public class EventsRestService {
        public void pushEvents(@PathParam("topic") String topic, InputStream msg,
                        @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
                log.info("Publishing message to topic " + topic);
-
+            
+              if(!isOffsetTopicCreated){
+                  preCreateOffsetTopic(msg);   
+              }
                try {
                        eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
                } 
@@ -313,8 +318,14 @@ public class EventsRestService {
                        @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
                // log.info("Publishing message with transaction id for topic " + topic
                // );
+               
 
                try {
+                       
+                       if(!isOffsetTopicCreated){
+                       preCreateOffsetTopic(request.getInputStream());   
+                     }
+                       
                        eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
                                        partitionKey,
                                        Utils.getFormattedDate(new Date()));
@@ -374,5 +385,18 @@ public class EventsRestService {
 
                return dmaapContext;
        }
+       
+       private void preCreateOffsetTopic(InputStream msg) {
+
+               try {
+                       eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
+                       eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
+                       isOffsetTopicCreated = true;
+               } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
+                               | missingReqdSetting | UnavailableException e) {
+                       log.error("Error while creating the dummy topic", e);
+               }
+
+       }
 
 }
\ No newline at end of file