@Override
public void putMessage(M message, long timeout, TimeUnit unit) throws APPCException{
- QueueMessage queueMessage = null;
-
try {
Date expirationTime = calculateExpirationTime(timeout,unit);
- queueMessage = new QueueMessage(message,expirationTime);
QueueManager queueManager = QueueManager.getInstance();
- boolean enqueueTask = queueManager.enqueueTask(queueMessage);
+ boolean enqueueTask = queueManager.enqueueTask(new QueueMessage<M>(message,expirationTime));
if(!enqueueTask){
throw new APPCException("failed to put message in queue");
}
public class QueueManager {
- private LinkedBlockingQueue<QueueMessage> queue;
+ private LinkedBlockingQueue<QueueMessage<? extends Runnable>> queue;
private MessageExpirationListener listener;
}
private void init(){
- queue = new LinkedBlockingQueue(MAX_QUEUE_SIZE);
+ queue = new LinkedBlockingQueue<QueueMessage<? extends Runnable>>(MAX_QUEUE_SIZE);
messageExecutor = Executors.newFixedThreadPool(MAX_THREAD_SIZE,Util.getThreadFactory(true));
for(int i=0;i<MAX_THREAD_SIZE;i++){
public void run() {
while (true){
try{
- QueueMessage queueMessage = queue.take();
+ QueueMessage<? extends Runnable> queueMessage = queue.take();
if(messageExpired(queueMessage)){
logger.debug("Message expired "+ queueMessage.getMessage());
if(listener != null){
this.listener = listener;
}
- public boolean enqueueTask(QueueMessage queueMessage) {
+ public boolean enqueueTask(QueueMessage<? extends Runnable> queueMessage) {
return queue.offer(queueMessage);
}
- private boolean messageExpired(QueueMessage queueMessage) {
+ private boolean messageExpired(QueueMessage<? extends Runnable> queueMessage) {
if(queueMessage.getExpirationTime() != null){
return queueMessage.getExpirationTime().getTime() < System.currentTimeMillis();
}