From: sunil unnava Date: Wed, 5 Dec 2018 18:00:13 +0000 (-0500) Subject: Fix for Kafka Consumer is not safe error X-Git-Tag: 1.0.10~4 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=2b80d1a99615392a791fa15f04085601f13fbaba;p=dmaap%2Fmessagerouter%2Fmessageservice.git Fix for Kafka Consumer is not safe error Pre create a dummy topic and subscribe to it during first POST call for posting a message to create __consumer_offsets topic Issue-ID: DMAAP-896 Change-Id: I11f3f9b8764232bc7d4e9bb270d5d73fc280cb80 Signed-off-by: sunil unnava --- diff --git a/src/main/java/org/onap/dmaap/service/EventsRestService.java b/src/main/java/org/onap/dmaap/service/EventsRestService.java index 2672261..b4aee10 100644 --- a/src/main/java/org/onap/dmaap/service/EventsRestService.java +++ b/src/main/java/org/onap/dmaap/service/EventsRestService.java @@ -96,6 +96,8 @@ public class EventsRestService { @Autowired private DMaaPErrorMessages errorMessages; + + private boolean isOffsetTopicCreated=false; /** * This method is used to consume messages.Taking three parameter @@ -254,7 +256,10 @@ public class EventsRestService { public void pushEvents(@PathParam("topic") String topic, InputStream msg, @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { log.info("Publishing message to topic " + topic); - + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(msg); + } try { eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null); } @@ -313,8 +318,14 @@ public class EventsRestService { @QueryParam("partitionKey") String partitionKey) throws CambriaApiException { // log.info("Publishing message with transaction id for topic " + topic // ); + try { + + if(!isOffsetTopicCreated){ + preCreateOffsetTopic(request.getInputStream()); + } + eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(), partitionKey, Utils.getFormattedDate(new Date())); @@ -374,5 +385,18 @@ public class EventsRestService { return dmaapContext; } + + private void preCreateOffsetTopic(InputStream msg) { + + try { + eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null); + eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1"); + isOffsetTopicCreated = true; + } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException + | missingReqdSetting | UnavailableException e) { + log.error("Error while creating the dummy topic", e); + } + + } } \ No newline at end of file