package org.onap.appc.listener.LCM.impl;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.commons.lang3.StringUtils;
-import org.onap.appc.listener.AbstractListener;
-import org.onap.appc.listener.ListenerProperties;
-import org.onap.appc.listener.LCM.conv.Converter;
-import org.onap.appc.listener.LCM.model.DmaapIncomingMessage;
-import org.onap.appc.listener.LCM.operation.ProviderOperations;
-
import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.eelf.i18n.EELFResourceManager;
-
+import com.fasterxml.jackson.databind.JsonNode;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.RejectedExecutionException;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.appc.listener.AbstractListener;
+import org.onap.appc.listener.LCM.conv.Converter;
+import org.onap.appc.listener.LCM.model.DmaapIncomingMessage;
+import org.onap.appc.listener.LCM.operation.ProviderOperations;
+import org.onap.appc.listener.ListenerProperties;
public class ListenerImpl extends AbstractListener {
super(props);
String url = props.getProperty("provider.url");
- LOG.info("DMaaP Provider Endpoint: " + url);
- providerOperations = new ProviderOperations();
- providerOperations.setUrl(url);
-
- // Set Basic Auth
String user = props.getProperty("provider.user");
String pass = props.getProperty("provider.pass");
- providerOperations.setAuthentication(user, pass);
+ providerOperations = new ProviderOperations(url, user, pass);
+ LOG.info("DMaaP Provider Endpoint: " + url);
}
@Override
if (executor.getQueue().size() <= QUEUED_MIN) {
LOG.debug("DMaaP queue running low. Querying for more jobs");
-
- List<DmaapIncomingMessage> messages = dmaap.getIncomingEvents(DmaapIncomingMessage.class, QUEUED_MAX);
+ List<DmaapIncomingMessage> messages = dmaap
+ .getIncomingEvents(DmaapIncomingMessage.class, QUEUED_MAX);
LOG.debug(String.format("Read %d messages from dmaap", messages.size()));
for (DmaapIncomingMessage incoming : messages) {
// Acknowledge that we read the event
}
} else {
// Badly formed message
- LOG.error("Message was not valid. Rejecting message: "+incoming);
+ LOG.error("Message was not valid. Rejecting message: " + incoming);
}
} else {
if (isValid(incoming)) {
LOG.info("Run stopped. Orphaning Message: " + requestIdWithSubId);
- }
- else {
+ } else {
// Badly formed message
- LOG.error("Message was not valid. Rejecting message: "+incoming);
+ LOG.error("Message was not valid. Rejecting message: " + incoming);
}
}
}
private boolean isValid(DmaapIncomingMessage incoming) {
return ((incoming != null) &&
- incoming.getBody() != null
- && !StringUtils.isEmpty(incoming.getRpcName()));
+ incoming.getBody() != null
+ && !StringUtils.isEmpty(incoming.getRpcName()));
}
@Override
String runningTime = df.format(new Date(time - startTime));
String out = String.format("Running for %s and completed %d jobs using %d threads.", runningTime,
- executor.getCompletedTaskCount(), executor.getPoolSize());
+ executor.getCompletedTaskCount(), executor.getPoolSize());
LOG.info("***BENCHMARK*** " + out);
return out;
}
- private String getRequestIdWithSubId(JsonNode event){
+ private String getRequestIdWithSubId(JsonNode event) {
String requestId = "";
try {
requestId = Converter.extractRequestIdWithSubId(event);