Remove cambria client dependency 91/49191/1
authorDaniel Silverthorn <daniel.silverthorn@amdocs.com>
Fri, 25 May 2018 20:09:38 +0000 (16:09 -0400)
committerDaniel Silverthorn <daniel.silverthorn@amdocs.com>
Fri, 25 May 2018 20:10:36 +0000 (16:10 -0400)
Change-Id: I9760839ae44df851640b271d032a39f4bb3691c2
Issue-ID: AAI-1182
Signed-off-by: Daniel Silverthorn <daniel.silverthorn@amdocs.com>
12 files changed:
pom.xml
src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java [new file with mode: 0644]
src/main/java/org/onap/aai/event/DMaaPEventBusComponent.java [moved from src/main/java/org/onap/aai/event/EventBusComponent.java with 78% similarity]
src/main/java/org/onap/aai/event/DMaaPEventBusEndpoint.java [moved from src/main/java/org/onap/aai/event/EventBusEndpoint.java with 58% similarity]
src/main/java/org/onap/aai/event/EventBusConsumer.java
src/main/java/org/onap/aai/event/EventBusProducer.java
src/main/java/org/onap/aai/event/KafkaEventBusComponent.java [new file with mode: 0644]
src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java [new file with mode: 0644]
src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus [new file with mode: 0644]
src/main/resources/META-INF/services/org/apache/camel/component/event-bus [deleted file]
src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus [new file with mode: 0644]
src/test/java/org/onap/aai/event/EventBusTest.java

diff --git a/pom.xml b/pom.xml
index 5ddef51..1cc8a5a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -34,6 +34,7 @@ limitations under the License.
    <name>aai-router-core</name>
    <properties>
       <checkstyle.config.location>google_checks.xml</checkstyle.config.location>
+      <event.client.version>1.3.0-SNAPSHOT</event.client.version>
       <!-- Sonar Properties -->
       <sonar.language>java</sonar.language>
       <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
@@ -60,20 +61,21 @@ limitations under the License.
          <artifactId>common-logging</artifactId>
          <version>1.2.2</version>
       </dependency>
+
+      <dependency>
+          <groupId>org.onap.aai.event-client</groupId>
+          <artifactId>event-client-api</artifactId>
+          <version>${event.client.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.onap.aai.event-client</groupId>
+          <artifactId>event-client-dmaap</artifactId>
+          <version>${event.client.version}</version>
+      </dependency>
       <dependency>
-         <groupId>com.att.nsa</groupId>
-         <artifactId>cambriaClient</artifactId>
-         <version>0.0.1</version>
-        <exclusions>
-                <exclusion>
-                  <groupId>org.apache.httpcomponents</groupId>
-                 <artifactId>httpclient</artifactId>
-                </exclusion>
-                <exclusion>
-                  <groupId>org.apache.httpcomponents</groupId>
-                 <artifactId>httpclient-cache</artifactId>
-                </exclusion>
-            </exclusions>
+          <groupId>org.onap.aai.event-client</groupId>
+          <artifactId>event-client-kafka</artifactId>
+          <version>${event.client.version}</version>
       </dependency>
 
       <dependency>
diff --git a/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java b/src/main/java/org/onap/aai/event/AbstractEventBusEndpoint.java
new file mode 100644 (file)
index 0000000..abbfe63
--- /dev/null
@@ -0,0 +1,43 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public abstract class AbstractEventBusEndpoint extends DefaultEndpoint {
+  public AbstractEventBusEndpoint() {
+  }
+
+  public AbstractEventBusEndpoint(String endpointUri, Component component) {
+    super(endpointUri, component);
+  }
+
+  public AbstractEventBusEndpoint(String endpointUri) {
+    super(endpointUri);
+  }
+
+  abstract void close();
+  abstract int getPollingDelay();
+  abstract int getPoolSize();
+  abstract String getEventTopic();
+
+}
@@ -27,22 +27,22 @@ import org.apache.camel.impl.UriEndpointComponent;
 import java.util.Map;
 
 /**
- * Represents the component that manages {@link EventBusEndpoint}.
+ * Represents the component that manages {@link DMaaPEventBusEndpoint}.
  */
-public class EventBusComponent extends UriEndpointComponent {
+public class DMaaPEventBusComponent extends UriEndpointComponent {
 
-  public EventBusComponent() {
-    super(EventBusEndpoint.class);
+  public DMaaPEventBusComponent() {
+    super(DMaaPEventBusEndpoint.class);
   }
 
-  public EventBusComponent(CamelContext context) {
-    super(context, EventBusEndpoint.class);
+  public DMaaPEventBusComponent(CamelContext context) {
+    super(context, DMaaPEventBusEndpoint.class);
   }
 
  @Override
   protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
       throws Exception {
-    Endpoint endpoint = new EventBusEndpoint(uri, this);
+    Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this);
     setProperties(endpoint, parameters);
     return endpoint;
   }
@@ -24,19 +24,19 @@ package org.onap.aai.event;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.eclipse.jetty.util.security.Password;
+import org.onap.aai.event.client.DMaaPEventConsumer;
 
 /**
  * Represents a EventBus endpoint.
  */
-@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name",
-    consumerClass = EventBusConsumer.class, title = "event-bus")
-public class EventBusEndpoint extends DefaultEndpoint {
+@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name",
+    consumerClass = EventBusConsumer.class, title = "dmaap-event-bus")
+public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint {
   @UriPath
   @Metadata(required = "true")
   private String name;
@@ -44,16 +44,16 @@ public class EventBusEndpoint extends DefaultEndpoint {
   @UriParam(label = "eventTopic")
   @Metadata(required = "true")
   private String eventTopic;
-  @UriParam(label = "groupName")
+  @UriParam(label = "consumerGroup")
   @Metadata(required = "true")
-  private String groupName;
-  @UriParam(label = "groupId")
+  private String consumerGroup;
+  @UriParam(label = "consumerId")
   @Metadata(required = "true")
-  private String groupId;
-  @UriParam(label = "apiKey")
-  private String apiKey;
-  @UriParam(label = "apiSecret")
-  private String apiSecret;
+  private String consumerId;
+  @UriParam(label = "username")
+  private String username;
+  @UriParam(label = "password")
+  private String password;
   @UriParam(label = "url")
   @Metadata(required = "true")
   private String url;
@@ -63,23 +63,36 @@ public class EventBusEndpoint extends DefaultEndpoint {
   @UriParam(label = "pollingDelay")
   @Metadata(required = "true", defaultValue="30000")
   private int pollingDelay = 30000;
+  @UriParam(label = "transportType")
+  @Metadata(required = "true", defaultValue="HTTPAUTH")
+  private String transportType;
+
+  private DMaaPEventConsumer dmaapConsumer;
   
-  public EventBusEndpoint() {}
+  public DMaaPEventBusEndpoint() {}
 
-  public EventBusEndpoint(String uri, EventBusComponent component) {
+  public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) {
     super(uri, component);
   }
 
-  public EventBusEndpoint(String endpointUri) {
+  public DMaaPEventBusEndpoint(String endpointUri) {
     super(endpointUri);
   }
+
+  @Override
+  void close() {
+    // Don't have to do anything for DMaaP
+  }
+
   @Override
   public Producer createProducer() throws Exception {
     return new EventBusProducer(this);
   }
   @Override
   public Consumer createConsumer(Processor processor) throws Exception {
-    return new EventBusConsumer(this, processor);
+    // TODO: other overloads based on filled-in properties
+    dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType);
+    return new EventBusConsumer(this, processor, dmaapConsumer);
   }
   @Override
   public boolean isSingleton() {
@@ -102,36 +115,38 @@ public class EventBusEndpoint extends DefaultEndpoint {
     this.eventTopic = eventTopic;
   }
 
-  public String getGroupName() {
-    return groupName;
+  public String getConsumerGroup() {
+    return consumerGroup;
   }
 
-  public void setGroupName(String groupName) {
-    this.groupName = groupName;
+  public void setConsumerGroup(String consumerGroup) {
+    this.consumerGroup = consumerGroup;
   }
 
-  public String getGroupId() {
-    return groupId;
+  public String getConsumerId() {
+    return consumerId;
   }
 
-  public void setGroupId(String groupId) {
-    this.groupId = groupId;
+  public void setConsumerId(String consumerId) {
+    this.consumerId = consumerId;
   }
 
-  public String getApiKey() {
-    return apiKey == null ? null : Password.deobfuscate(apiKey);
+  public String getUsername() {
+    return username == null ? null : Password.deobfuscate(username);
+    //return username;
   }
 
-  public void setApiKey(String apiKey) {
-    this.apiKey = apiKey;
+  public void setUsername(String username) {
+    this.username = username;
   }
 
-  public String getApiSecret() {
-    return apiSecret == null ? null : Password.deobfuscate(apiSecret);
+  public String getPassword() {
+    return password == null ? null : Password.deobfuscate(password);
+    //return password;
   }
 
-  public void setApiSecret(String apiSecret) {
-    this.apiSecret = apiSecret;
+  public void setPassword(String password) {
+    this.password = password;
   }
   
   public int getPoolSize() {
@@ -157,5 +172,13 @@ public class EventBusEndpoint extends DefaultEndpoint {
   public void setUrl(String url) {
     this.url = url;
   }
+
+  public String getTransportType() {
+    return transportType;
+  }
+
+  public void setTransportType(String transportType) {
+    this.transportType = transportType;
+  }
 }
 
index a9d5478..e795e2c 100644 (file)
@@ -20,9 +20,7 @@
  */
 package org.onap.aai.event;
 
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -33,10 +31,6 @@ import org.onap.aai.cl.api.Logger;
 import org.onap.aai.cl.eelf.LoggerFactory;
 import org.onap.aai.cl.mdc.MdcContext;
 
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -49,46 +43,21 @@ public class EventBusConsumer extends ScheduledPollConsumer {
 
   private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
   private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
-  private final EventBusEndpoint endpoint;
+  private final AbstractEventBusEndpoint endpoint;
 
-  private CambriaConsumer consumer;
+  private EventConsumer consumer;
 
   /**
    * EventBusConsumer Constructor.
    */
-  public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+  public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
     super(endpoint, processor);
     super.setDelay(endpoint.getPollingDelay());
     this.endpoint = endpoint;
 
     setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
 
-    String[] urls = endpoint.getUrl().split(",");
-
-    List<String> urlList = null;
-
-    if (urls != null) {
-      urlList = Arrays.asList(urls);
-    }
-
-    try {
-
-      ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
-          .usingHosts(urlList).onTopic(endpoint.getEventTopic())
-          .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
-      String apiKey = endpoint.getApiKey();
-      String apiSecret = endpoint.getApiSecret();
-
-      if (apiKey != null && apiSecret != null) {
-        consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
-      }
-
-      consumer = consumerBuilder.build();
-
-    } catch (MalformedURLException | GeneralSecurityException e) {
-      logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage());
-    }
+    this.consumer = consumer;
   }
 
   /**
@@ -104,7 +73,8 @@ public class EventBusConsumer extends ScheduledPollConsumer {
 
     int processCount = 0;
 
-    Iterable<String> messages = consumer.fetch();
+    //Iterable<String> messages = consumer.fetch();
+    Iterable<String> messages = consumer.consumeAndCommit();
 
     String topic = endpoint.getEventTopic();
 
@@ -119,15 +89,15 @@ public class EventBusConsumer extends ScheduledPollConsumer {
   @Override
   protected void doStop() throws Exception {
     super.doStop();
-    if (consumer != null) {
-      consumer.close();
+    if (endpoint != null) {
+      endpoint.close();
     }
   }
   @Override
   protected void doShutdown() throws Exception {
     super.doShutdown();
-    if (consumer != null) {
-      consumer.close();
+    if (endpoint != null) {
+      endpoint.close();
     }
   }
 
index ec2a0da..dd8dbed 100644 (file)
@@ -27,9 +27,9 @@ import org.apache.camel.impl.DefaultProducer;
  * The EventBus producer.
  */
 public class EventBusProducer extends DefaultProducer {
-  private EventBusEndpoint endpoint;
+  private AbstractEventBusEndpoint endpoint;
 
-  public EventBusProducer(EventBusEndpoint endpoint) {
+  public EventBusProducer(AbstractEventBusEndpoint endpoint) {
     super(endpoint);
     this.endpoint = endpoint;
   }
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java b/src/main/java/org/onap/aai/event/KafkaEventBusComponent.java
new file mode 100644 (file)
index 0000000..07b4c82
--- /dev/null
@@ -0,0 +1,49 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+import java.util.Map;
+
+/**
+ * Represents the component that manages {@link KafkaEventBusEndpoint}.
+ */
+public class KafkaEventBusComponent extends UriEndpointComponent {
+
+  public KafkaEventBusComponent() {
+    super(KafkaEventBusEndpoint.class);
+  }
+
+  public KafkaEventBusComponent(CamelContext context) {
+    super(context, KafkaEventBusEndpoint.class);
+  }
+
+ @Override
+  protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+      throws Exception {
+    Endpoint endpoint = new KafkaEventBusEndpoint(uri, this);
+    setProperties(endpoint, parameters);
+    return endpoint;
+  }
+}
diff --git a/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java b/src/main/java/org/onap/aai/event/KafkaEventBusEndpoint.java
new file mode 100644 (file)
index 0000000..4194c89
--- /dev/null
@@ -0,0 +1,129 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.onap.aai.event.client.KafkaEventConsumer;
+
+/**
+ * Represents a EventBus endpoint.
+ */
+@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
+    consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
+public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
+  @UriParam(label = "url")
+  @Metadata(required = "true")
+  private String url;
+  @UriParam(label = "eventTopic")
+  @Metadata(required = "true")
+  private String eventTopic;
+  @UriParam(label = "consumerGroup")
+  @Metadata(required = "true")
+  private String consumerGroup;
+  @UriParam(label = "poolSize")
+  @Metadata(required = "true", defaultValue="20")
+  private int poolSize = 20;
+  @UriParam(label = "pollingDelay")
+  @Metadata(required = "true", defaultValue="30000")
+  private int pollingDelay = 30000;
+
+  private KafkaEventConsumer consumer;
+
+  public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
+    super(uri, component);
+  }
+
+  @Override
+  public Producer createProducer() throws Exception {
+    return new EventBusProducer(this);
+  }
+
+  @Override
+  public Consumer createConsumer(Processor processor) throws Exception {
+    consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
+    return new EventBusConsumer(this, processor, consumer);
+  }
+
+  @Override
+  public boolean isSingleton() {
+    return false;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  @Override
+  String getEventTopic() {
+    return eventTopic;
+  }
+
+  public void setEventTopic(String eventTopic) {
+    this.eventTopic = eventTopic;
+  }
+
+  public String getConsumerGroup() {
+    return consumerGroup;
+  }
+
+  public void setConsumerGroup(String consumerGroup) {
+    this.consumerGroup = consumerGroup;
+  }
+
+  @Override
+  int getPoolSize() {
+    return poolSize;
+  }
+
+  public void setPoolSize(int poolSize) {
+    this.poolSize = poolSize;
+  }
+
+  @Override
+  void close() {
+    consumer.close();
+  }
+
+  @Override
+  int getPollingDelay() {
+    return pollingDelay;
+  }
+
+  public void setPollingDelay(int pollingDelay) {
+    this.pollingDelay = pollingDelay;
+  }
+
+
+
+
+
+
+}
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/dmaap-event-bus
new file mode 100644 (file)
index 0000000..f1fee02
--- /dev/null
@@ -0,0 +1 @@
+class=org.onap.aai.event.DMaaPEventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/event-bus
deleted file mode 100644 (file)
index f795067..0000000
+++ /dev/null
@@ -1 +0,0 @@
-class=org.onap.aai.event.EventBusComponent
diff --git a/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus b/src/main/resources/META-INF/services/org/apache/camel/component/kafka-event-bus
new file mode 100644 (file)
index 0000000..89f243d
--- /dev/null
@@ -0,0 +1 @@
+class=org.onap.aai.event.KafkaEventBusComponent
index 8d294cd..1add1c0 100644 (file)
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertTrue;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.HashMap;
-import java.util.Map;
 
 import org.apache.camel.Endpoint;
 import org.junit.Before;
@@ -46,23 +45,23 @@ public class EventBusTest {
     @Test
     public void validateProducer() throws Exception {
         try {
-            EventBusComponent rc = new EventBusComponent();            
-            EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
-            endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+            DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+            DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
+            endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+            endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
             endpoint.setEventTopic("eventTopic");
-            endpoint.setGroupId("groupId");
-            endpoint.setGroupName("gn");
+            endpoint.setConsumerId("groupId");
+            endpoint.setConsumerGroup("gn");
             endpoint.setName("name");
             endpoint.setPoolSize(45);
             endpoint.setPollingDelay(10);
             endpoint.setUrl("url");
 
-            assertTrue(endpoint.getApiSecret().compareTo("onapSecret") == 0);
-            assertTrue(endpoint.getApiKey().compareTo("onapSecret") == 0);
+            assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0);
+            assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0);
             assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
-            assertTrue(endpoint.getGroupId().compareTo("groupId") == 0);
-            assertTrue(endpoint.getGroupName().compareTo("gn") == 0);
+            assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0);
+            assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0);
             assertTrue(endpoint.getName().compareTo("name") == 0);
             assertTrue(endpoint.getPoolSize() == 45);
             assertTrue(endpoint.getPollingDelay() == 10);
@@ -84,7 +83,7 @@ public class EventBusTest {
     
     @Test
     public void validateEventBusComponent() throws Exception {
-        EventBusComponent rc = new EventBusComponent(new TestCamelContext());
+        DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext());
         Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>());
         assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint"));
     }
@@ -92,22 +91,22 @@ public class EventBusTest {
     @Test
     public void validateConsumer() throws Exception {
         try {
-            EventBusComponent rc = new EventBusComponent();
-            EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
+            DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+            DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
 
-            endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
-            endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+            endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+            endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
             endpoint.setEventTopic("eventTopic");
-            endpoint.setGroupId("groupId");
-            endpoint.setGroupName("gn");
+            endpoint.setConsumerId("groupId");
+            endpoint.setConsumerGroup("gn");
             endpoint.setName("name");
             endpoint.setPoolSize(45);
             endpoint.setPollingDelay(10);
             endpoint.setUrl("url");
 
             TestProcessor processor = new TestProcessor();
-            EventBusConsumer consumer = new EventBusConsumer(endpoint, processor);
-            
+            EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor);
+
         }
         catch (Exception ex) {
             StringWriter writer = new StringWriter();