Merge of new rebased code
[appc.git] / appc-event-listener / appc-event-listener-bundle / src / main / java / org / openecomp / appc / listener / impl / EventHandlerImpl.java
index 590afbe..ee9f30e 100644 (file)
  */
 
 package org.openecomp.appc.listener.impl;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.openecomp.appc.adapter.dmaap.Consumer;
-import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
-import org.openecomp.appc.adapter.dmaap.Producer;
-import org.openecomp.appc.adapter.dmaap.DmaapConsumer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
+import org.openecomp.appc.adapter.factory.MessageService;
+import org.openecomp.appc.adapter.message.Consumer;
+import org.openecomp.appc.adapter.message.MessageAdapterFactory;
+import org.openecomp.appc.adapter.message.Producer;
 import org.openecomp.appc.listener.EventHandler;
 import org.openecomp.appc.listener.ListenerProperties;
-import org.openecomp.appc.listener.ListenerProperties.MessageService;
 import org.openecomp.appc.listener.util.Mapper;
 import org.openecomp.appc.logging.LoggingConstants;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
 import org.slf4j.MDC;
 
+import java.util.*;
+
 /**
  * This class is a wrapper for the DMaaP client provided in appc-dmaap-adapter. Its aim is to ensure that only well formed
  * messages are sent and received on DMaaP.
@@ -154,7 +150,16 @@ public class EventHandlerImpl implements EventHandler {
                LOG.info("Getting Consumer...");
                reader = getConsumer();
         }
-        for (String item : reader.fetch(READ_TIMEOUT * 1000, limit)) {
+        
+        List<String> items = null;
+        try{
+            items = reader.fetch(READ_TIMEOUT * 1000, limit);
+        }catch(Error r){
+            LOG.error("EvenHandlerImpl.getIncomingEvents",r);
+        }
+        
+        
+        for (String item : items) {
             out.add(item);
         }
         LOG.info(String.format("Read %d messages from %s as %s/%s.", out.size(), readTopic, clientName, clientId));
@@ -198,13 +203,25 @@ public class EventHandlerImpl implements EventHandler {
             LOG.error(
                 "*****We will be writing and reading to the same topic without a filter. This will cause an infinite loop.*****");
         }
-        Consumer out;
-        out = new DmaapConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
-        for (String url : pool) {
-            if (url.contains("3905") || url.contains("https")) {
-                out.useHttps(true);
-                break;
-            }
+        
+        Consumer out=null;
+        BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
+        if (ctx != null) {
+               ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+               if (svcRef != null) {
+                       try{
+                           out = ((MessageAdapterFactory) ctx.getService(svcRef)).createConsumer(pool, readTopic, clientName, clientId, filter_json, apiKey, apiSecret);
+                       }catch(Error e){
+                           //TODO:create eelf message
+                           LOG.error("EvenHandlerImp.getConsumer calling MessageAdapterFactor.createConsumer",e);
+                       }
+                       for (String url : pool) {
+                           if (url.contains("3905") || url.contains("https")) {
+                               out.useHttps(true);
+                               break;
+                           }
+                       }
+               }
         }
         return out;
     }
@@ -217,18 +234,19 @@ public class EventHandlerImpl implements EventHandler {
     protected Producer getProducer() {
         LOG.debug(String.format("Getting Producer: %s  %s", pool, readTopic));
 
-        Producer out;
-        out = new DmaapProducer(pool,writeTopics);
-
-        if (apiKey != null && apiSecret != null) {
-            out.updateCredentials(apiKey, apiSecret);
-        }
-
-        for (String url : pool) {
-            if (url.contains("3905") || url.contains("https")) {
-                out.useHttps(true);
-                break;
-            }
+        Producer out = null;
+        BundleContext ctx = FrameworkUtil.getBundle(EventHandlerImpl.class).getBundleContext();
+        if (ctx != null) {
+               ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+               if (svcRef != null) {
+                       out = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopics,apiKey, apiSecret);
+                       for (String url : pool) {
+                           if (url.contains("3905") || url.contains("https")) {
+                               out.useHttps(true);
+                               break;
+                           }
+                       }
+               }
         }
         return out;
     }
@@ -236,23 +254,11 @@ public class EventHandlerImpl implements EventHandler {
     @Override
     public void closeClients() {
        LOG.debug("Closing Consumer and Producer DMaaP clients");
-        switch (messageService) {
-            case DMaaP:
-                if (reader != null) {
-                       ((DmaapConsumer) reader).close();
-                }
-                if (producer != null) {
-                       ((DmaapProducer) producer).close();
-                }
-                break;
-            default:
-               // close DMaaP clients
-                if (reader != null) {
-                       ((DmaapConsumer) reader).close();
-                }
-                if (producer != null) {
-                       ((DmaapProducer) producer).close();
-                }
+        if (reader != null) {
+               reader.close();
+        }
+        if (producer != null) {
+               producer.close();
         }
     }