Client to use controller type for DMaaP partition
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / protocol / AsyncProtocolImpl.java
index 82626d8..9f39b9f 100644 (file)
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP : APPC
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Copyright (C) 2017 Amdocs
  * =============================================================================
@@ -18,7 +18,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * 
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  * ============LICENSE_END=========================================================
  */
 
@@ -70,8 +69,9 @@ class AsyncProtocolImpl implements AsyncProtocol {
      */
     private ExecutorService executorService = Executors.newSingleThreadExecutor();
 
-    private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+    private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
 
+    private String controllerType = null;
 
     AsyncProtocolImpl() {
 
@@ -80,14 +80,17 @@ class AsyncProtocolImpl implements AsyncProtocol {
         messageWriter = (MessageWriter) messageReader;
     }
 
+    @Override
     public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
 
         if (callback == null) {
             throw new ProtocolException("Callback param should not be null!");
         }
         this.callback = callback;
-
-        try {
+        
+        controllerType = props.getProperty(UEBPropertiesKeys.CONTROLLER_TYPE);
+        
+        try {            
             messageService.init(props);
             //get message bus listener thread
             //start the thread after initializing services
@@ -97,8 +100,13 @@ class AsyncProtocolImpl implements AsyncProtocol {
         }
     }
 
+    @Override
     public void sendRequest(String payload, MessageContext context) throws ProtocolException {
-
+        if (controllerType != null && controllerType.length()!= 0 && (!controllerType.equals("APPC")))
+        {
+            context.setPartiton(controllerType);
+        }
+        
         //get message to be sent to appc from payload and context
         String message = messageWriter.write(payload, context);
         try {
@@ -119,7 +127,7 @@ class AsyncProtocolImpl implements AsyncProtocol {
 
     public class Listener implements Runnable {
 
-
+        @Override
         public void run() {
 
             while (!isShutdown) {
@@ -128,28 +136,31 @@ class AsyncProtocolImpl implements AsyncProtocol {
                     messages = messageService.fetch();
                     LOG.debug("Successfully fetched " + messages.size() + " messages");
                 } catch (IOException e) {
-                    LOG.error("Fetching " + messages.size() + " messages failed");
+                    LOG.error("Fetching " + messages.size() + " messages failed", e);
                 }
                 for (String message : messages) {
+                    handleMessage(message);
+                }
+            }
+        }
 
-                    MessageContext context = new MessageContext();
-                    String payload = null;
-
-                    try {
-                        //get payload and context from message to be sent to core layer
-                        payload = messageReader.read(message, context);
-                        LOG.debug("Got body: " + payload);
-                        //call core layer response handler
-                        if(!isShutdown) {
-                            callback.onResponse(payload, context);
-                        }else{
-                            LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
-                                    context.getCorrelationID() + "> response ", message);
-                        }
-                    } catch (ProtocolException e) {
-                        LOG.error("Failed to read message from UEB. message is: " + message);
-                    }
+        private void handleMessage(String message) {
+            MessageContext context = new MessageContext();
+            String payload;
+
+            try {
+                //get payload and context from message to be sent to core layer
+                payload = messageReader.read(message, context);
+                LOG.debug("Got body: " + payload);
+                //call core layer response handler
+                if (!isShutdown) {
+                    callback.onResponse(payload, context);
+                } else {
+                    LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+                        context.getCorrelationID() + "> response ", message);
                 }
+            } catch (ProtocolException e) {
+                LOG.error("Failed to read message from UEB. message is: " + message, e);
             }
         }
     }