private ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
public void addSubscriber(Subscriber subscriber) {
- if (!pollingRequests.containsKey(subscriber.getUrl())) {
+ if (!pollingRequests.containsKey(subscriber.getTopic())) {
DMaaPPollingRequest pollingTask = new DMaaPPollingRequest(subscriber, droolsEngine);
ScheduledFuture future = service
.scheduleAtFixedRate(pollingTask, 0, subscriber.getPeriod(), TimeUnit.MILLISECONDS);
- pollingRequests.put(subscriber.getUrl(), future);
+ pollingRequests.put(subscriber.getTopic(), future);
}
}
public void removeSubscriber(Subscriber subscriber) {
- ScheduledFuture future = pollingRequests.get(subscriber.getUrl());
+ ScheduledFuture future = pollingRequests.get(subscriber.getTopic());
if (future != null) {
future.cancel(true);
}
- pollingRequests.remove(subscriber.getUrl());
+ pollingRequests.remove(subscriber.getTopic());
}
}