Merge of new rebased code
[appc.git] / appc-dispatcher / appc-request-handler / appc-request-handler-core / src / main / java / org / openecomp / appc / messageadapter / impl / MessageAdapterImpl.java
 
 package org.openecomp.appc.messageadapter.impl;
 
-import java.util.HashSet;
-import java.util.Properties;
 
-import org.apache.commons.lang.ObjectUtils;
-import org.openecomp.appc.adapter.dmaap.Producer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
+import org.openecomp.appc.adapter.factory.MessageService;
+import org.openecomp.appc.adapter.message.MessageAdapterFactory;
+import org.openecomp.appc.adapter.message.Producer;
 import org.openecomp.appc.configuration.Configuration;
 import org.openecomp.appc.configuration.ConfigurationFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.ObjectUtils;
 import org.openecomp.appc.domainmodel.lcm.ResponseContext;
 import org.openecomp.appc.domainmodel.lcm.VNFOperation;
 import org.openecomp.appc.messageadapter.MessageAdapter;
 import org.openecomp.appc.requesthandler.conv.Converter;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.HashSet;
+import java.util.Properties;
 
-public class MessageAdapterDmaapImpl implements MessageAdapter{
+public class MessageAdapterImpl implements MessageAdapter{
 
-    private Producer dmaapProducer;
+    private MessageService messageService;
+    private Producer producer;
     private String partition ;
     private Configuration configuration;
     private HashSet<String> pool;
@@ -48,21 +55,28 @@ public class MessageAdapterDmaapImpl implements MessageAdapter{
     private String apiKey;
     private String apiSecret;
 
-    private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapterDmaapImpl.class);
+    private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapterImpl.class);
 
     /**
-     * Initialize dmaapProducer client to post messages using configuration properties
+     * Initialize producer client to post messages using configuration properties
      */
     @Override
     public void init(){
-        this.dmaapProducer = getDmaapProducer();
+        this.producer = getProducer();
     }
-    private Producer getDmaapProducer() {
+
+    private Producer getProducer() {
         configuration = ConfigurationFactory.getConfiguration();
         Properties properties=configuration.getProperties();
         updateProperties(properties);
-        Producer producer=new DmaapProducer(pool,writeTopic);
-        producer.updateCredentials(apiKey, apiSecret);
+        
+        BundleContext ctx = FrameworkUtil.getBundle(MessageAdapterImpl.class).getBundleContext();
+        if (ctx != null) {
+               ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+               if (svcRef != null) {
+                       producer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic,apiKey, apiSecret);
+               }
+        }
         return producer;
     }
 
@@ -73,10 +87,13 @@ public class MessageAdapterDmaapImpl implements MessageAdapter{
         }
         pool = new HashSet<>();
         if (props != null) {
-            writeTopic = props.getProperty("dmaap.topic.write");
-            apiKey = props.getProperty("dmaap.client.key");
-            apiSecret = props.getProperty("dmaap.client.secret");
-            String hostnames = props.getProperty("dmaap.poolMembers");
+            // readTopic = props.getProperty("dmaap.topic.read");
+            writeTopic = props.getProperty("appc.LCM.topic.write");
+            apiKey = props.getProperty("appc.LCM.client.key");
+            apiSecret = props.getProperty("appc.LCM.client.secret");
+            messageService = MessageService.parse(props.getProperty("message.service.type"));
+            //  READ_TIMEOUT = Integer.valueOf(props.getProperty("dmaap.topic.read.timeout", String.valueOf(READ_TIMEOUT)));
+            String hostnames = props.getProperty("appc.LCM.poolMembers");
             if (hostnames != null && !hostnames.isEmpty()) {
                 for (String name : hostnames.split(",")) {
                     pool.add(name);
@@ -103,9 +120,9 @@ public class MessageAdapterDmaapImpl implements MessageAdapter{
             if (logger.isDebugEnabled()) {
                 logger.debug("DMaaP Response = " + jsonMessage);
             }
-            success  = dmaapProducer.post(this.partition, jsonMessage);
+            success  = producer.post(this.partition, jsonMessage);
         } catch (JsonProcessingException e1) {
-            logger.error("Error generating Jason from DMaaP message "+ e1.getMessage());
+            logger.error("Error generating Json from DMaaP message "+ e1.getMessage());
             success= false;
         }catch (Exception e){
             logger.error("Error sending message to DMaaP "+e.getMessage());