@Autowired
private DMaaPErrorMessages errorMessages;
+
+ private boolean isOffsetTopicCreated=false;
/**
* This method is used to consume messages.Taking three parameter
public void pushEvents(@PathParam("topic") String topic, InputStream msg,
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
log.info("Publishing message to topic " + topic);
-
+
+ if(!isOffsetTopicCreated){
+ preCreateOffsetTopic(msg);
+ }
try {
eventsService.pushEvents(getDmaapContext(), topic, msg, partitionKey, null);
}
@QueryParam("partitionKey") String partitionKey) throws CambriaApiException {
// log.info("Publishing message with transaction id for topic " + topic
// );
+
try {
+
+ if(!isOffsetTopicCreated){
+ preCreateOffsetTopic(request.getInputStream());
+ }
+
eventsService.pushEvents(getDmaapContext(), topic, request.getInputStream(),
partitionKey,
Utils.getFormattedDate(new Date()));
return dmaapContext;
}
+
+ private void preCreateOffsetTopic(InputStream msg) {
+
+ try {
+ eventsService.pushEvents(getDmaapContext(), "DUMMY_TOPIC", msg, null, null);
+ eventsService.getEvents(getDmaapContext(), "DUMMY_TOPIC", "CG1", "C1");
+ isOffsetTopicCreated = true;
+ } catch (CambriaApiException | ConfigDbException | AccessDeniedException | TopicExistsException | IOException
+ | missingReqdSetting | UnavailableException e) {
+ log.error("Error while creating the dummy topic", e);
+ }
+
+ }
}
\ No newline at end of file