import org.onap.appc.executionqueue.MessageExpirationListener;
import org.onap.appc.executionqueue.impl.object.QueueMessage;
-import java.time.Instant;
+import java.util.Calendar;
+import java.util.Date;
import java.util.concurrent.TimeUnit;
public class ExecutionQueueServiceImpl<M extends Runnable> implements ExecutionQueueService<M> {
- private static final EELFLogger logger =
- EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
+ private final EELFLogger logger = EELFManager.getInstance().getLogger(ExecutionQueueServiceImpl.class);
private QueueManager queueManager;
- public ExecutionQueueServiceImpl() {
- //do nothing
+ public ExecutionQueueServiceImpl(){
+
+ }
+
+ @Override
+ public void putMessage(M message) throws APPCException {
+ this.putMessage(message,-1,null);
}
/**
* Injected by blueprint
- *
- * @param queueManager queue manager to be set
+ * @param queueManager
*/
public void setQueueManager(QueueManager queueManager) {
this.queueManager = queueManager;
}
@Override
- public void putMessage(M message) throws APPCException {
- this.putMessage(message, -1, null);
- }
+ public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
+ QueueMessage queueMessage;
- @Override
- public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException {
- Instant expirationTime = calculateExpirationTime(timeout, unit);
- boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<>(message, expirationTime));
- if (!enqueueTask) {
- logger.error("Error in putMessage method of ExecutionQueueServiceImpl");
- throw new APPCException("Failed to put message in queue");
+ try {
+ Date expirationTime = calculateExpirationTime(timeout,unit);
+ queueMessage = new QueueMessage(message,expirationTime);
+ boolean enqueueTask = queueManager.enqueueTask(queueMessage);
+ if(!enqueueTask){
+ throw new APPCException("failed to put message in queue");
+ }
+ } catch (Exception e) {
+ logger.error("Error in putMessage method of ExecutionQueueServiceImpl" + e.getMessage());
+ throw new APPCException(e);
}
}
- @Override
- public void registerMessageExpirationListener(MessageExpirationListener listener) {
- queueManager.setListener(listener);
- }
-
- private Instant calculateExpirationTime(long timeToLive, TimeUnit unit) {
- if (timeToLive > 0 && unit != null) {
- // as of Java 8, there is no built-in conversion method from
- // TimeUnit to ChronoUnit; do it manually
- return Instant.now().plusMillis(unit.toMillis(timeToLive));
- } else {
- // never expires
- return Instant.MAX;
+ private Date calculateExpirationTime(long timeToLive, TimeUnit unit) {
+ Date expirationTime = null;
+ if(timeToLive > 0){
+ long currentTime = System.currentTimeMillis();
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(currentTime + unit.toMillis(timeToLive));
+ expirationTime = cal.getTime();
}
+ return expirationTime;
}
}