import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.eclipse.jetty.util.security.Password;
+import org.onap.aai.event.client.DMaaPEventConsumer;
/**
* Represents a EventBus endpoint.
*/
-@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name",
- consumerClass = EventBusConsumer.class, title = "event-bus")
-public class EventBusEndpoint extends DefaultEndpoint {
+@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name",
+ consumerClass = EventBusConsumer.class, title = "dmaap-event-bus")
+public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint {
@UriPath
@Metadata(required = "true")
private String name;
@UriParam(label = "eventTopic")
@Metadata(required = "true")
private String eventTopic;
- @UriParam(label = "groupName")
+ @UriParam(label = "consumerGroup")
@Metadata(required = "true")
- private String groupName;
- @UriParam(label = "groupId")
+ private String consumerGroup;
+ @UriParam(label = "consumerId")
@Metadata(required = "true")
- private String groupId;
- @UriParam(label = "apiKey")
- private String apiKey;
- @UriParam(label = "apiSecret")
- private String apiSecret;
+ private String consumerId;
+ @UriParam(label = "username")
+ private String username;
+ @UriParam(label = "password")
+ private String password;
@UriParam(label = "url")
@Metadata(required = "true")
private String url;
@UriParam(label = "pollingDelay")
@Metadata(required = "true", defaultValue="30000")
private int pollingDelay = 30000;
+ @UriParam(label = "transportType")
+ @Metadata(required = "true", defaultValue="HTTPAUTH")
+ private String transportType;
+
+ private DMaaPEventConsumer dmaapConsumer;
- public EventBusEndpoint() {}
+ public DMaaPEventBusEndpoint() {}
- public EventBusEndpoint(String uri, EventBusComponent component) {
+ public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) {
super(uri, component);
}
- public EventBusEndpoint(String endpointUri) {
+ public DMaaPEventBusEndpoint(String endpointUri) {
super(endpointUri);
}
+
+ @Override
+ void close() {
+ // Don't have to do anything for DMaaP
+ }
+
@Override
public Producer createProducer() throws Exception {
return new EventBusProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new EventBusConsumer(this, processor);
+ // TODO: other overloads based on filled-in properties
+ dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType);
+ return new EventBusConsumer(this, processor, dmaapConsumer);
}
@Override
public boolean isSingleton() {
this.eventTopic = eventTopic;
}
- public String getGroupName() {
- return groupName;
+ public String getConsumerGroup() {
+ return consumerGroup;
}
- public void setGroupName(String groupName) {
- this.groupName = groupName;
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
}
- public String getGroupId() {
- return groupId;
+ public String getConsumerId() {
+ return consumerId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
}
- public String getApiKey() {
- return apiKey == null ? null : Password.deobfuscate(apiKey);
+ public String getUsername() {
+ return username == null ? null : Password.deobfuscate(username);
+ //return username;
}
- public void setApiKey(String apiKey) {
- this.apiKey = apiKey;
+ public void setUsername(String username) {
+ this.username = username;
}
- public String getApiSecret() {
- return apiSecret == null ? null : Password.deobfuscate(apiSecret);
+ public String getPassword() {
+ return password == null ? null : Password.deobfuscate(password);
+ //return password;
}
- public void setApiSecret(String apiSecret) {
- this.apiSecret = apiSecret;
+ public void setPassword(String password) {
+ this.password = password;
}
public int getPoolSize() {
public void setUrl(String url) {
this.url = url;
}
+
+ public String getTransportType() {
+ return transportType;
+ }
+
+ public void setTransportType(String transportType) {
+ this.transportType = transportType;
+ }
}