Fix for Kafka Consumer is not safe error 85/74285/1
authorsunil unnava <sunil.unnava@att.com>
Wed, 5 Dec 2018 18:00:13 +0000 (13:00 -0500)
committersunil unnava <sunil.unnava@att.com>
Wed, 5 Dec 2018 18:03:06 +0000 (13:03 -0500)
Pre create a dummy topic and subscribe to it during first POST call for
posting a message to create __consumer_offsets topic

Issue-ID: DMAAP-896
Change-Id: I11f3f9b8764232bc7d4e9bb270d5d73fc280cb80
Signed-off-by: sunil unnava <sunil.unnava@att.com>
src/main/java/org/onap/dmaap/service/EventsRestService.java

index 2672261..b4aee10 100644 (file)
@@ -96,6 +96,8 @@ public class EventsRestService {
 
        @Autowired
        private DMaaPErrorMessages errorMessages;
+       
+       private boolean isOffsetTopicCreated=false;
 
        /**
         * This method is used to consume messages.Taking three parameter
@@ -254,7 +256,10 @@ public class EventsRestService {
        public void pushEvents(@PathParam("topic") String topic, InputStream msg,
                        @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
                log.info("Publishing message to topic " + topic);
-
+            
+              if(!isOffsetTopicCreated){
+                  preCreateOffsetTopic(msg);   
+              }
                try {
                        eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
                } 
@@ -313,8 +318,14 @@ public class EventsRestService {
                        @QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
                // log.info("Publishing message with transaction id for topic " + topic
                // );
+               
 
                try {
+                       
+                       if(!isOffsetTopicCreated){
+                       preCreateOffsetTopic(request.getInputStream());   
+                     }
+                       
                        eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), 
                                        partitionKey,
                                        Utils.getFormattedDate(new Date()));
@@ -374,5 +385,18 @@ public class EventsRestService {
 
                return dmaapContext;
        }
+       
+       private void preCreateOffsetTopic(InputStream msg) {
+
+               try {
+                       eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
+                       eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
+                       isOffsetTopicCreated = true;
+               } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
+                               | missingReqdSetting | UnavailableException e) {
+                       log.error("Error while creating the dummy topic", e);
+               }
+
+       }
 
 }
\ No newline at end of file