Make generic router core implementations 31/54831/1
authorGurjeet Bedi <gurjeetb@amdocs.com>
Thu, 14 Jun 2018 16:20:23 +0000 (12:20 -0400)
committerGurjeet Bedi <gurjeetb@amdocs.com>
Thu, 14 Jun 2018 16:23:01 +0000 (12:23 -0400)
Remove dependency of implementation classes of event client

Issue-ID: AAI-1228
Change-Id: I464439e225de6b32941257c090932bd2f6066dc8
Signed-off-by: Gurjeet Bedi <gurjeetb@amdocs.com>
14 files changed:
pom.xml
src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java
src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java [deleted file]
src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java [deleted file]
src/main/java/org/onap/aai/event/EventBusComponent.java [moved from src/test/java/org/onap/aai/event/TestProcessor.java with 59% similarity]
src/main/java/org/onap/aai/event/EventBusConsumer.java
src/main/java/org/onap/aai/event/EventBusEndPoint.java [new file with mode: 0644]
src/main/java/org/onap/aai/event/EventBusProducer.java
src/main/java/org/onap/aai/event/KafkaEventBusComponent.java [deleted file]
src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java [deleted file]
src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus [deleted file]
src/main/resources/META-INF/services/org/apache/camel/component/event-bus [new file with mode: 0644]
src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus [deleted file]
src/test/java/org/onap/aai/event/EventBusTest.java

diff --git a/pom.xml b/pom.xml
index 1cc8a5a..ddf47f8 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -105,6 +105,12 @@ limitations under the License.
          <version>2.20.1</version>
          <scope>test</scope>
       </dependency>
+    <dependency>
+       <groupId>org.mockito</groupId>
+       <artifactId>mockito-all</artifactId>
+       <version>1.10.19</version>
+       <scope>test</scope>
+    </dependency>
    </dependencies>
    <build>
       <defaultGoal>install</defaultGoal>
index abbfe63..9a5abee 100644 (file)
@@ -24,18 +24,12 @@ import org.apache.camel.Component;
 import org.apache.camel.impl.DefaultEndpoint;
 
 public abstract class AbstractEventBusEndpoint extends DefaultEndpoint {
-  public AbstractEventBusEndpoint() {
-  }
 
   public AbstractEventBusEndpoint(String endpointUri, Component component) {
     super(endpointUri, component);
   }
 
-  public AbstractEventBusEndpoint(String endpointUri) {
-    super(endpointUri);
-  }
-
-  abstract void close();
+  abstract void close() throws Exception;
   abstract int getPollingDelay();
   abstract int getPoolSize();
   abstract String getEventTopic();
diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java b/src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java
deleted file mode 100644 (file)
index 696e0f4..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.event;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.UriEndpointComponent;
-
-import java.util.Map;
-
-/**
- * Represents the component that manages {@link DMaaPEventBusEndpoint}.
- */
-public class DMaaPEventBusComponent extends UriEndpointComponent {
-
-  public DMaaPEventBusComponent() {
-    super(DMaaPEventBusEndpoint.class);
-  }
-
-  public DMaaPEventBusComponent(CamelContext context) {
-    super(context, DMaaPEventBusEndpoint.class);
-  }
-
- @Override
-  protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
-      throws Exception {
-    Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this);
-    setProperties(endpoint, parameters);
-    return endpoint;
-  }
-}
diff --git a/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java b/src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java
deleted file mode 100644 (file)
index 3cc37e8..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.event;
-
-
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
-import org.eclipse.jetty.util.security.Password;
-import org.onap.aai.event.client.DMaaPEventConsumer;
-
-/**
- * Represents a EventBus endpoint.
- */
-@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name",
-    consumerClass = EventBusConsumer.class, title = "dmaap-event-bus")
-public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint {
-  @UriPath
-  @Metadata(required = "true")
-  private String name;
-
-  @UriParam(label = "eventTopic")
-  @Metadata(required = "true")
-  private String eventTopic;
-  @UriParam(label = "consumerGroup")
-  @Metadata(required = "true")
-  private String consumerGroup;
-  @UriParam(label = "consumerId")
-  @Metadata(required = "true")
-  private String consumerId;
-  @UriParam(label = "username")
-  private String username;
-  @UriParam(label = "password")
-  private String password;
-  @UriParam(label = "url")
-  @Metadata(required = "true")
-  private String url;
-  @UriParam(label = "poolSize")
-  @Metadata(required = "true", defaultValue="20")
-  private int poolSize = 20;
-  @UriParam(label = "pollingDelay")
-  @Metadata(required = "true", defaultValue="30000")
-  private int pollingDelay = 30000;
-  @UriParam(label = "transportType")
-  @Metadata(required = "true", defaultValue="HTTPAUTH")
-  private String transportType = "HTTPAUTH";
-
-  private DMaaPEventConsumer dmaapConsumer;
-  
-  public DMaaPEventBusEndpoint() {}
-
-  public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) {
-    super(uri, component);
-  }
-
-  public DMaaPEventBusEndpoint(String endpointUri) {
-    super(endpointUri);
-  }
-
-  @Override
-  void close() {
-    // Don't have to do anything for DMaaP
-  }
-
-  @Override
-  public Producer createProducer() throws Exception {
-    return new EventBusProducer(this);
-  }
-  @Override
-  public Consumer createConsumer(Processor processor) throws Exception {
-    // TODO: other overloads based on filled-in properties
-    dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, username, Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType);
-    return new EventBusConsumer(this, processor, dmaapConsumer);
-  }
-  @Override
-  public boolean isSingleton() {
-    return false;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public String getEventTopic() {
-    return eventTopic;
-  }
-
-  public void setEventTopic(String eventTopic) {
-    this.eventTopic = eventTopic;
-  }
-
-  public String getConsumerGroup() {
-    return consumerGroup;
-  }
-
-  public void setConsumerGroup(String consumerGroup) {
-    this.consumerGroup = consumerGroup;
-  }
-
-  public String getConsumerId() {
-    return consumerId;
-  }
-
-  public void setConsumerId(String consumerId) {
-    this.consumerId = consumerId;
-  }
-
-  public String getUsername() {
-    return username == null ? null : Password.deobfuscate(username);
-    //return username;
-  }
-
-  public void setUsername(String username) {
-    this.username = username;
-  }
-
-  public String getPassword() {
-    return password == null ? null : Password.deobfuscate(password);
-    //return password;
-  }
-
-  public void setPassword(String password) {
-    this.password = password;
-  }
-  
-  public int getPoolSize() {
-       return poolSize;
-  }
-
-  public void setPoolSize(int poolsize) {
-       this.poolSize = poolsize;
-  }
-
-  public int getPollingDelay() {
-         return pollingDelay;
-  }
-
-  public void setPollingDelay(int pollingDelay) {
-         this.pollingDelay = pollingDelay;
-  }
-
-  public String getUrl() {
-         return url;
-  }
-
-  public void setUrl(String url) {
-    this.url = url;
-  }
-
-  public String getTransportType() {
-    return transportType;
-  }
-
-  public void setTransportType(String transportType) {
-    this.transportType = transportType;
-  }
-}
-
@@ -2,8 +2,8 @@
  * ============LICENSE_START=======================================================
  * org.onap.aai
  * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
+ * Copyright © 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2018 Amdocs
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-
 package org.onap.aai.event;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-
-public class TestProcessor implements Processor {
-
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        // TODO Auto-generated method stub
+import java.util.Map;
 
-    }
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.DefaultComponent;
 
+public class EventBusComponent extends DefaultComponent {
+       public EventBusComponent() {
+               super();
+       }
+       public EventBusComponent(CamelContext context) {
+               super(context);
+       }
+       
+       @Override
+       protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+               Endpoint endpoint = new EventBusEndPoint(uri, this);
+               setProperties(endpoint, parameters);
+               return endpoint;
+       }
 }
index da8ffb5..b189cfd 100644 (file)
@@ -31,7 +31,6 @@ import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 
-import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
diff --git a/src/main/java/org/onap/aai/event/EventBusEndPoint.java b/src/main/java/org/onap/aai/event/EventBusEndPoint.java
new file mode 100644 (file)
index 0000000..621b30f
--- /dev/null
@@ -0,0 +1,108 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+
+@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name",
+consumerClass = EventBusConsumer.class, title = "event-bus")
+public class EventBusEndPoint extends AbstractEventBusEndpoint {
+       @UriParam(label = "eventTopic")
+       @Metadata(required = "true")
+       private String eventTopic;
+       @UriParam(label = "poolSize")
+       @Metadata(required = "true", defaultValue="20")
+       private int poolSize = 20;
+       @UriParam(label = "pollingDelay")
+       @Metadata(required = "true", defaultValue="30000")
+       private int pollingDelay = 30000;
+       EventConsumer consumer; //This would be injected via bean through camel route when passed with #
+       
+       EventPublisher publisher; //This would be injected via bean through camel route when passed with #
+       
+       private Logger logger = LoggerFactory.getInstance().getLogger(EventBusEndPoint.class);
+       
+       public EventBusEndPoint(String uri, EventBusComponent component) {
+               super(uri, component);
+       }
+        
+       @Override
+       public Producer createProducer() throws Exception {
+               return new EventBusProducer(this, publisher);
+       }
+
+       @Override
+       public Consumer createConsumer(Processor processor) throws Exception {
+               return new EventBusConsumer(this, processor, consumer);
+       }
+
+       @Override
+       public boolean isSingleton() {
+               return false;
+       }
+       
+       void close() throws Exception {
+          if(consumer != null)
+                  consumer.close();
+               if(publisher != null)
+                  publisher.close();
+       }       
+       
+       public void setPoolSize(int poolSize) {
+               this.poolSize = poolSize;
+       }
+
+       public void setPollingDelay(int pollingDelay) {
+               this.pollingDelay = pollingDelay;
+       }
+
+       public int getPollingDelay() {
+       return pollingDelay;
+    }
+       public int getPoolSize() {
+               return poolSize;
+       }
+       public String getEventTopic() {
+               return eventTopic;
+       }
+
+       public void setEventTopic(String eventTopic) {
+               this.eventTopic = eventTopic;
+       }
+
+       public void setConsumer(EventConsumer consumer) {
+               this.consumer = consumer;
+       }
+
+       public void setPublisher(EventPublisher publisher) {
+               this.publisher = publisher;
+       }
+}
index dd8dbed..dfd1bfe 100644 (file)
@@ -22,17 +22,22 @@ package org.onap.aai.event;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultProducer;
+import org.onap.aai.event.api.EventPublisher;
 
 /**
  * The EventBus producer.
  */
 public class EventBusProducer extends DefaultProducer {
   private AbstractEventBusEndpoint endpoint;
+  
+  private EventPublisher publisher;
 
-  public EventBusProducer(AbstractEventBusEndpoint endpoint) {
-    super(endpoint);
-    this.endpoint = endpoint;
+  public EventBusProducer(AbstractEventBusEndpoint endpoint, EventPublisher publisher) {
+           super(endpoint);
+           this.endpoint = endpoint;
+           this.publisher = publisher;
   }
+  
   @Override
   public void process(Exchange exchange) throws Exception {
     // Publishing to event bus is currently not supported
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java
deleted file mode 100644 (file)
index 07b4c82..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.event;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.UriEndpointComponent;
-
-import java.util.Map;
-
-/**
- * Represents the component that manages {@link KafkaEventBusEndpoint}.
- */
-public class KafkaEventBusComponent extends UriEndpointComponent {
-
-  public KafkaEventBusComponent() {
-    super(KafkaEventBusEndpoint.class);
-  }
-
-  public KafkaEventBusComponent(CamelContext context) {
-    super(context, KafkaEventBusEndpoint.class);
-  }
-
- @Override
-  protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
-      throws Exception {
-    Endpoint endpoint = new KafkaEventBusEndpoint(uri, this);
-    setProperties(endpoint, parameters);
-    return endpoint;
-  }
-}
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java
deleted file mode 100644 (file)
index 4194c89..0000000
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * ============LICENSE_START=======================================================
- * org.onap.aai
- * ================================================================================
- * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017-2018 Amdocs
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.aai.event;
-
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
-import org.apache.camel.spi.UriParam;
-import org.onap.aai.event.client.KafkaEventConsumer;
-
-/**
- * Represents a EventBus endpoint.
- */
-@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
-    consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
-public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
-  @UriParam(label = "url")
-  @Metadata(required = "true")
-  private String url;
-  @UriParam(label = "eventTopic")
-  @Metadata(required = "true")
-  private String eventTopic;
-  @UriParam(label = "consumerGroup")
-  @Metadata(required = "true")
-  private String consumerGroup;
-  @UriParam(label = "poolSize")
-  @Metadata(required = "true", defaultValue="20")
-  private int poolSize = 20;
-  @UriParam(label = "pollingDelay")
-  @Metadata(required = "true", defaultValue="30000")
-  private int pollingDelay = 30000;
-
-  private KafkaEventConsumer consumer;
-
-  public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
-    super(uri, component);
-  }
-
-  @Override
-  public Producer createProducer() throws Exception {
-    return new EventBusProducer(this);
-  }
-
-  @Override
-  public Consumer createConsumer(Processor processor) throws Exception {
-    consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
-    return new EventBusConsumer(this, processor, consumer);
-  }
-
-  @Override
-  public boolean isSingleton() {
-    return false;
-  }
-
-  public String getUrl() {
-    return url;
-  }
-
-  public void setUrl(String url) {
-    this.url = url;
-  }
-
-  @Override
-  String getEventTopic() {
-    return eventTopic;
-  }
-
-  public void setEventTopic(String eventTopic) {
-    this.eventTopic = eventTopic;
-  }
-
-  public String getConsumerGroup() {
-    return consumerGroup;
-  }
-
-  public void setConsumerGroup(String consumerGroup) {
-    this.consumerGroup = consumerGroup;
-  }
-
-  @Override
-  int getPoolSize() {
-    return poolSize;
-  }
-
-  public void setPoolSize(int poolSize) {
-    this.poolSize = poolSize;
-  }
-
-  @Override
-  void close() {
-    consumer.close();
-  }
-
-  @Override
-  int getPollingDelay() {
-    return pollingDelay;
-  }
-
-  public void setPollingDelay(int pollingDelay) {
-    this.pollingDelay = pollingDelay;
-  }
-
-
-
-
-
-
-}
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus
deleted file mode 100644 (file)
index f1fee02..0000000
+++ /dev/null
@@ -1 +0,0 @@
-class=org.onap.aai.event.DMaaPEventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus
new file mode 100644 (file)
index 0000000..f795067
--- /dev/null
@@ -0,0 +1 @@
+class=org.onap.aai.event.EventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus
deleted file mode 100644 (file)
index 89f243d..0000000
+++ /dev/null
@@ -1 +0,0 @@
-class=org.onap.aai.event.KafkaEventBusComponent
index 1add1c0..c7cd527 100644 (file)
@@ -23,17 +23,49 @@ package org.onap.aai.event;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.impl.MessageSupport;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
 
+import com.att.aft.dme2.hazelcast.core.Message;
+
+@RunWith(MockitoJUnitRunner.class)
 public class EventBusTest {
+       @Mock
+    public EventConsumer consumer;
+       
+       @Mock
+    public EventPublisher publisher;
+       
+       @Mock
+       public CamelContext context;
+       
+       @Mock
+       public Processor processor;
 
-    /**
+       @Mock
+       Exchange exchange;
+       
+       @Mock
+       AbstractEventBusEndpoint endPoint;
+       
+       /**
      * Test case initialization
      * 
      * @throws Exception the exception
@@ -44,77 +76,60 @@ public class EventBusTest {
 
     @Test
     public void validateProducer() throws Exception {
-        try {
-            DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
-            DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
-            endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setEventTopic("eventTopic");
-            endpoint.setConsumerId("groupId");
-            endpoint.setConsumerGroup("gn");
-            endpoint.setName("name");
-            endpoint.setPoolSize(45);
-            endpoint.setPollingDelay(10);
-            endpoint.setUrl("url");
-
-            assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0);
-            assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0);
-            assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
-            assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0);
-            assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0);
-            assertTrue(endpoint.getName().compareTo("name") == 0);
-            assertTrue(endpoint.getPoolSize() == 45);
-            assertTrue(endpoint.getPollingDelay() == 10);
-            assertTrue(endpoint.getUrl().compareTo("url") == 0);
-            assertFalse(endpoint.isSingleton());
-
-            EventBusProducer producer = (EventBusProducer)endpoint.createProducer();
-            assertTrue(producer.getEndpoint() != null);
-        }
-        catch (Exception ex) {
-            StringWriter writer = new StringWriter();
-            PrintWriter printWriter = new PrintWriter( writer );
-            ex.printStackTrace( printWriter );
-            printWriter.flush();
-            System.out.println(writer.toString());
-            throw ex;
-        }
+       EventBusComponent rc = new EventBusComponent();
+        EventBusEndPoint endpoint = new EventBusEndPoint("http://host.com:8443/endpoint", rc);
+        endpoint.setEventTopic("eventTopic");
+        endpoint.setPublisher(publisher);
+        endpoint.setPoolSize(45);
+        endpoint.setPollingDelay(10);
+        
+        assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
+        assertTrue(endpoint.getPoolSize() == 45);
+        assertTrue(endpoint.getPollingDelay() == 10);
+        assertFalse(endpoint.isSingleton());
+        EventBusProducer producer = (EventBusProducer)endpoint.createProducer();
+        assertTrue(producer.getEndpoint() != null);
+               endpoint.close();
     }
     
     @Test
     public void validateEventBusComponent() throws Exception {
-        DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext());
+        EventBusComponent rc = new EventBusComponent(context);
         Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>());
         assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint"));
     }
     
     @Test
     public void validateConsumer() throws Exception {
-        try {
-            DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
-            DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
-
-            endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setEventTopic("eventTopic");
-            endpoint.setConsumerId("groupId");
-            endpoint.setConsumerGroup("gn");
-            endpoint.setName("name");
-            endpoint.setPoolSize(45);
-            endpoint.setPollingDelay(10);
-            endpoint.setUrl("url");
-
-            TestProcessor processor = new TestProcessor();
-            EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor);
-
-        }
-        catch (Exception ex) {
-            StringWriter writer = new StringWriter();
-            PrintWriter printWriter = new PrintWriter( writer );
-            ex.printStackTrace( printWriter );
-            printWriter.flush();
-            System.out.println(writer.toString());
-            throw ex;
-        }
+        EventBusComponent rc = new EventBusComponent();
+        EventBusEndPoint endpoint = new EventBusEndPoint("http://host.com:8443/endpoint", rc);
+        
+        endpoint.setConsumer(consumer);
+        endpoint.setEventTopic("eventTopic");
+        endpoint.setPoolSize(45);
+        endpoint.setPollingDelay(10);
+        
+        assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
+        assertTrue(endpoint.getPoolSize() == 45);
+        assertTrue(endpoint.getPollingDelay() == 10);
+        assertFalse(endpoint.isSingleton());
+        
+        EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor);
+    }
+    
+    @Test
+    public void validateConsumerPoll() throws Exception {
+       MessageSupport me = new DefaultMessage(context);
+       List<String> list = new ArrayList<>();
+       list.add("Message 1");
+       list.add("Message 2");
+       
+       Mockito.when(consumer.consumeAndCommit()).thenReturn(list);
+        Mockito.when(endPoint.createExchange()).thenReturn(exchange);
+        Mockito.when(exchange.getIn()).thenReturn(me);
+        Mockito.when(exchange.getOut()).thenReturn(me);
+        
+        EventBusConsumer busConsumer = new EventBusConsumer(endPoint, processor, consumer);
+        int messages = busConsumer.poll();
     }
 }