TaskQueueManager fixes 05/29205/2
authorJakub Dudycz <jakub.dudycz@nokia.com>
Thu, 25 Jan 2018 17:38:57 +0000 (18:38 +0100)
committerPatrick Brady <pb071s@att.com>
Thu, 25 Jan 2018 20:08:50 +0000 (20:08 +0000)
Change-Id: I995f315ebe27bf09afedab90b787366a998c083c
Issue-ID: APPC-529
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
appc-client/client-lib/src/main/java/org/onap/appc/client/impl/protocol/AsyncProtocolImpl.java

index 82626d8..8567d99 100644 (file)
@@ -70,7 +70,7 @@ class AsyncProtocolImpl implements AsyncProtocol {
      */
     private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-    private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+    private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
 
 
     AsyncProtocolImpl() {
@@ -80,6 +80,7 @@ class AsyncProtocolImpl implements AsyncProtocol {
         messageWriter = (MessageWriter) messageReader;
     }
 
+    @Override
     public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
 
         if (callback == null) {
@@ -97,6 +98,7 @@ class AsyncProtocolImpl implements AsyncProtocol {
         }
     }
 
+    @Override
     public void sendRequest(String payload, MessageContext context) throws ProtocolException {
 
         //get message to be sent to appc from payload and context
@@ -119,7 +121,7 @@ class AsyncProtocolImpl implements AsyncProtocol {
 
     public class Listener implements Runnable {
 
-
+        @Override
         public void run() {
 
             while (!isShutdown) {
@@ -128,28 +130,31 @@ class AsyncProtocolImpl implements AsyncProtocol {
                     messages = messageService.fetch();
                     LOG.debug("Successfully fetched " + messages.size() + " messages");
                 } catch (IOException e) {
-                    LOG.error("Fetching " + messages.size() + " messages failed");
+                    LOG.error("Fetching " + messages.size() + " messages failed", e);
                 }
                 for (String message : messages) {
+                    handleMessage(message);
+                }
+            }
+        }
 
-                    MessageContext context = new MessageContext();
-                    String payload = null;
-
-                    try {
-                        //get payload and context from message to be sent to core layer
-                        payload = messageReader.read(message, context);
-                        LOG.debug("Got body: " + payload);
-                        //call core layer response handler
-                        if(!isShutdown) {
-                            callback.onResponse(payload, context);
-                        }else{
-                            LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
-                                    context.getCorrelationID() + "> response ", message);
-                        }
-                    } catch (ProtocolException e) {
-                        LOG.error("Failed to read message from UEB. message is: " + message);
-                    }
+        private void handleMessage(String message) {
+            MessageContext context = new MessageContext();
+            String payload;
+
+            try {
+                //get payload and context from message to be sent to core layer
+                payload = messageReader.read(message, context);
+                LOG.debug("Got body: " + payload);
+                //call core layer response handler
+                if (!isShutdown) {
+                    callback.onResponse(payload, context);
+                } else {
+                    LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+                        context.getCorrelationID() + "> response ", message);
                 }
+            } catch (ProtocolException e) {
+                LOG.error("Failed to read message from UEB. message is: " + message, e);
             }
         }
     }