Update apache camel from 2.x to 3.x
[aai/router-core.git] / src / main / java / org / onap / aai / event / EventBusConsumer.java
index 2da879d..bc698df 100644 (file)
@@ -2,8 +2,8 @@
  * ============LICENSE_START=======================================================
  * org.onap.aai
  * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017 Amdocs
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
  */
 package org.onap.aai.event;
 
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.support.ScheduledPollConsumer;
 import org.onap.aai.logging.RouterCoreMsgs;
 import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -51,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer {
 
   private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
   private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
-  private final EventBusEndpoint endpoint;
+  private final AbstractEventBusEndpoint endpoint;
 
-  private CambriaConsumer consumer;
+  private EventConsumer consumer;
 
   /**
    * EventBusConsumer Constructor.
    */
-  public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+  public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
     super(endpoint, processor);
     super.setDelay(endpoint.getPollingDelay());
     this.endpoint = endpoint;
 
     setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
 
-    String[] urls = endpoint.getUrl().split(",");
-
-    List<String> urlList = null;
-
-    if (urls != null) {
-      urlList = Arrays.asList(urls);
-    }
-
-    try {
-
-      ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
-          .usingHosts(urlList).onTopic(endpoint.getEventTopic())
-          .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
-      String apiKey = endpoint.getApiKey();
-      String apiSecret = endpoint.getApiSecret();
-
-      if (apiKey != null && apiSecret != null) {
-        consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
-      }
-
-      consumer = consumerBuilder.build();
-
-    } catch (MalformedURLException | GeneralSecurityException e) {
-      logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e.getLocalizedMessage());
-    }
+    this.consumer = consumer;
   }
 
   /**
@@ -102,36 +69,41 @@ public class EventBusConsumer extends ScheduledPollConsumer {
   @Override
   protected int poll() throws Exception {
 
-    logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+    String topic = endpoint.getEventTopic();
+
+    logger.debug("Checking for event on topic: " + topic);
 
     int processCount = 0;
 
-    Iterable<String> messages = null;
+    try {
+      Iterable<String> messages = consumer.consumeAndCommit();
 
-    messages = consumer.fetch();
+      for (String message : messages) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(message);
+        getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+        ++processCount;
+      }
+    } catch (Exception e) {
+      logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
+    }
 
-    String topic = endpoint.getEventTopic();
+    logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
 
-    for (String message : messages) {
-      Exchange exchange = endpoint.createExchange();
-      exchange.getIn().setBody(message);
-      getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
-      ++processCount;
-    }
     return processCount;
   }
   @Override
   protected void doStop() throws Exception {
     super.doStop();
-    if (consumer != null) {
-      consumer.close();
+    if (endpoint != null) {
+      endpoint.end();
     }
   }
   @Override
   protected void doShutdown() throws Exception {
     super.doShutdown();
-    if (consumer != null) {
-      consumer.close();
+    if (endpoint != null) {
+      endpoint.end();
     }
   }
 
@@ -163,7 +135,7 @@ public class EventBusConsumer extends ScheduledPollConsumer {
         }
 
       } catch (Exception e) {
-        logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e.getLocalizedMessage());
+        logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION,e,e.getLocalizedMessage());
       } finally {
         // log exception if an exception occurred and was not handled
         if (message.getException() != null) {