private long startTime = 0;
+ private final ProviderOperations providerOperations;
+
public ListenerImpl(ListenerProperties props) {
super(props);
String url = props.getProperty("provider.url");
LOG.info("DMaaP Provider Endpoint: " + url);
- ProviderOperations.setUrl(url);
+ providerOperations = new ProviderOperations();
+ providerOperations.setUrl(url);
// Set Basic Auth
String user = props.getProperty("provider.user");
String pass = props.getProperty("provider.pass");
- ProviderOperations.setAuthentication(user, pass);
+ providerOperations.setAuthentication(user, pass);
}
@Override
if (isValid(incoming)) {
LOG.info(String.format("Adding DMaaP message to pool queue [%s]", requestIdWithSubId));
try {
- executor.execute(new WorkerImpl(incoming.getRpcName(),incoming.getBody(), dmaap));
+ executor.execute(new WorkerImpl(incoming, dmaap, providerOperations));
} catch (RejectedExecutionException rejectEx) {
LOG.error("Task Rejected: ", rejectEx);
}