<name>aai-router-core</name>
<properties>
<checkstyle.config.location>google_checks.xml</checkstyle.config.location>
+ <event.client.version>1.3.0-SNAPSHOT</event.client.version>
<!-- Sonar Properties -->
<sonar.language>java</sonar.language>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<artifactId>common-logging</artifactId>
<version>1.2.2</version>
</dependency>
+
+ <dependency>
+ <groupId>org.onap.aai.event-client</groupId>
+ <artifactId>event-client-api</artifactId>
+ <version>${event.client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.aai.event-client</groupId>
+ <artifactId>event-client-dmaap</artifactId>
+ <version>${event.client.version}</version>
+ </dependency>
<dependency>
- <groupId>com.att.nsa</groupId>
- <artifactId>cambriaClient</artifactId>
- <version>0.0.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient-cache</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.onap.aai.event-client</groupId>
+ <artifactId>event-client-kafka</artifactId>
+ <version>${event.client.version}</version>
</dependency>
<dependency>
--- /dev/null
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+
+public abstract class AbstractEventBusEndpoint extends DefaultEndpoint {
+ public AbstractEventBusEndpoint() {
+ }
+
+ public AbstractEventBusEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ public AbstractEventBusEndpoint(String endpointUri) {
+ super(endpointUri);
+ }
+
+ abstract void close();
+ abstract int getPollingDelay();
+ abstract int getPoolSize();
+ abstract String getEventTopic();
+
+}
import java.util.Map;
/**
- * Represents the component that manages {@link EventBusEndpoint}.
+ * Represents the component that manages {@link DMaaPEventBusEndpoint}.
*/
-public class EventBusComponent extends UriEndpointComponent {
+public class DMaaPEventBusComponent extends UriEndpointComponent {
- public EventBusComponent() {
- super(EventBusEndpoint.class);
+ public DMaaPEventBusComponent() {
+ super(DMaaPEventBusEndpoint.class);
}
- public EventBusComponent(CamelContext context) {
- super(context, EventBusEndpoint.class);
+ public DMaaPEventBusComponent(CamelContext context) {
+ super(context, DMaaPEventBusEndpoint.class);
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
throws Exception {
- Endpoint endpoint = new EventBusEndpoint(uri, this);
+ Endpoint endpoint = new DMaaPEventBusEndpoint(uri, this);
setProperties(endpoint, parameters);
return endpoint;
}
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.eclipse.jetty.util.security.Password;
+import org.onap.aai.event.client.DMaaPEventConsumer;
/**
* Represents a EventBus endpoint.
*/
-@UriEndpoint(scheme = "event-bus", syntax = "event-bus:name",
- consumerClass = EventBusConsumer.class, title = "event-bus")
-public class EventBusEndpoint extends DefaultEndpoint {
+@UriEndpoint(scheme = "dmaap-event-bus", syntax = "dmaap-event-bus:name",
+ consumerClass = EventBusConsumer.class, title = "dmaap-event-bus")
+public class DMaaPEventBusEndpoint extends AbstractEventBusEndpoint {
@UriPath
@Metadata(required = "true")
private String name;
@UriParam(label = "eventTopic")
@Metadata(required = "true")
private String eventTopic;
- @UriParam(label = "groupName")
+ @UriParam(label = "consumerGroup")
@Metadata(required = "true")
- private String groupName;
- @UriParam(label = "groupId")
+ private String consumerGroup;
+ @UriParam(label = "consumerId")
@Metadata(required = "true")
- private String groupId;
- @UriParam(label = "apiKey")
- private String apiKey;
- @UriParam(label = "apiSecret")
- private String apiSecret;
+ private String consumerId;
+ @UriParam(label = "username")
+ private String username;
+ @UriParam(label = "password")
+ private String password;
@UriParam(label = "url")
@Metadata(required = "true")
private String url;
@UriParam(label = "pollingDelay")
@Metadata(required = "true", defaultValue="30000")
private int pollingDelay = 30000;
+ @UriParam(label = "transportType")
+ @Metadata(required = "true", defaultValue="HTTPAUTH")
+ private String transportType;
+
+ private DMaaPEventConsumer dmaapConsumer;
- public EventBusEndpoint() {}
+ public DMaaPEventBusEndpoint() {}
- public EventBusEndpoint(String uri, EventBusComponent component) {
+ public DMaaPEventBusEndpoint(String uri, DMaaPEventBusComponent component) {
super(uri, component);
}
- public EventBusEndpoint(String endpointUri) {
+ public DMaaPEventBusEndpoint(String endpointUri) {
super(endpointUri);
}
+
+ @Override
+ void close() {
+ // Don't have to do anything for DMaaP
+ }
+
@Override
public Producer createProducer() throws Exception {
return new EventBusProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new EventBusConsumer(this, processor);
+ // TODO: other overloads based on filled-in properties
+ dmaapConsumer = new DMaaPEventConsumer(url, eventTopic, Password.deobfuscate(username), Password.deobfuscate(password), consumerGroup, consumerId, 15000, 1000, transportType);
+ return new EventBusConsumer(this, processor, dmaapConsumer);
}
@Override
public boolean isSingleton() {
this.eventTopic = eventTopic;
}
- public String getGroupName() {
- return groupName;
+ public String getConsumerGroup() {
+ return consumerGroup;
}
- public void setGroupName(String groupName) {
- this.groupName = groupName;
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
}
- public String getGroupId() {
- return groupId;
+ public String getConsumerId() {
+ return consumerId;
}
- public void setGroupId(String groupId) {
- this.groupId = groupId;
+ public void setConsumerId(String consumerId) {
+ this.consumerId = consumerId;
}
- public String getApiKey() {
- return apiKey == null ? null : Password.deobfuscate(apiKey);
+ public String getUsername() {
+ return username == null ? null : Password.deobfuscate(username);
+ //return username;
}
- public void setApiKey(String apiKey) {
- this.apiKey = apiKey;
+ public void setUsername(String username) {
+ this.username = username;
}
- public String getApiSecret() {
- return apiSecret == null ? null : Password.deobfuscate(apiSecret);
+ public String getPassword() {
+ return password == null ? null : Password.deobfuscate(password);
+ //return password;
}
- public void setApiSecret(String apiSecret) {
- this.apiSecret = apiSecret;
+ public void setPassword(String password) {
+ this.password = password;
}
public int getPoolSize() {
public void setUrl(String url) {
this.url = url;
}
+
+ public String getTransportType() {
+ return transportType;
+ }
+
+ public void setTransportType(String transportType) {
+ this.transportType = transportType;
+ }
}
*/
package org.onap.aai.event;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
- private final EventBusEndpoint endpoint;
+ private final AbstractEventBusEndpoint endpoint;
- private CambriaConsumer consumer;
+ private EventConsumer consumer;
/**
* EventBusConsumer Constructor.
*/
- public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+ public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
super(endpoint, processor);
super.setDelay(endpoint.getPollingDelay());
this.endpoint = endpoint;
setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
- String[] urls = endpoint.getUrl().split(",");
-
- List<String> urlList = null;
-
- if (urls != null) {
- urlList = Arrays.asList(urls);
- }
-
- try {
-
- ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList).onTopic(endpoint.getEventTopic())
- .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
- String apiKey = endpoint.getApiKey();
- String apiSecret = endpoint.getApiSecret();
-
- if (apiKey != null && apiSecret != null) {
- consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
- }
-
- consumer = consumerBuilder.build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e, e.getLocalizedMessage());
- }
+ this.consumer = consumer;
}
/**
int processCount = 0;
- Iterable<String> messages = consumer.fetch();
+ //Iterable<String> messages = consumer.fetch();
+ Iterable<String> messages = consumer.consumeAndCommit();
String topic = endpoint.getEventTopic();
@Override
protected void doStop() throws Exception {
super.doStop();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}
@Override
protected void doShutdown() throws Exception {
super.doShutdown();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.close();
}
}
* The EventBus producer.
*/
public class EventBusProducer extends DefaultProducer {
- private EventBusEndpoint endpoint;
+ private AbstractEventBusEndpoint endpoint;
- public EventBusProducer(EventBusEndpoint endpoint) {
+ public EventBusProducer(AbstractEventBusEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
--- /dev/null
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+
+import java.util.Map;
+
+/**
+ * Represents the component that manages {@link KafkaEventBusEndpoint}.
+ */
+public class KafkaEventBusComponent extends UriEndpointComponent {
+
+ public KafkaEventBusComponent() {
+ super(KafkaEventBusEndpoint.class);
+ }
+
+ public KafkaEventBusComponent(CamelContext context) {
+ super(context, KafkaEventBusEndpoint.class);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+ throws Exception {
+ Endpoint endpoint = new KafkaEventBusEndpoint(uri, this);
+ setProperties(endpoint, parameters);
+ return endpoint;
+ }
+}
--- /dev/null
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.event;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.onap.aai.event.client.KafkaEventConsumer;
+
+/**
+ * Represents a EventBus endpoint.
+ */
+@UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
+ consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
+public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
+ @UriParam(label = "url")
+ @Metadata(required = "true")
+ private String url;
+ @UriParam(label = "eventTopic")
+ @Metadata(required = "true")
+ private String eventTopic;
+ @UriParam(label = "consumerGroup")
+ @Metadata(required = "true")
+ private String consumerGroup;
+ @UriParam(label = "poolSize")
+ @Metadata(required = "true", defaultValue="20")
+ private int poolSize = 20;
+ @UriParam(label = "pollingDelay")
+ @Metadata(required = "true", defaultValue="30000")
+ private int pollingDelay = 30000;
+
+ private KafkaEventConsumer consumer;
+
+ public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
+ super(uri, component);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ return new EventBusProducer(this);
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
+ return new EventBusConsumer(this, processor, consumer);
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return false;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ @Override
+ String getEventTopic() {
+ return eventTopic;
+ }
+
+ public void setEventTopic(String eventTopic) {
+ this.eventTopic = eventTopic;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ @Override
+ int getPoolSize() {
+ return poolSize;
+ }
+
+ public void setPoolSize(int poolSize) {
+ this.poolSize = poolSize;
+ }
+
+ @Override
+ void close() {
+ consumer.close();
+ }
+
+ @Override
+ int getPollingDelay() {
+ return pollingDelay;
+ }
+
+ public void setPollingDelay(int pollingDelay) {
+ this.pollingDelay = pollingDelay;
+ }
+
+
+
+
+
+
+}
--- /dev/null
+class=org.onap.aai.event.DMaaPEventBusComponent
+++ /dev/null
-class=org.onap.aai.event.EventBusComponent
--- /dev/null
+class=org.onap.aai.event.KafkaEventBusComponent
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
-import java.util.Map;
import org.apache.camel.Endpoint;
import org.junit.Before;
@Test
public void validateProducer() throws Exception {
try {
- EventBusComponent rc = new EventBusComponent();
- EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
- endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
- endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+ DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
+ endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
endpoint.setEventTopic("eventTopic");
- endpoint.setGroupId("groupId");
- endpoint.setGroupName("gn");
+ endpoint.setConsumerId("groupId");
+ endpoint.setConsumerGroup("gn");
endpoint.setName("name");
endpoint.setPoolSize(45);
endpoint.setPollingDelay(10);
endpoint.setUrl("url");
- assertTrue(endpoint.getApiSecret().compareTo("onapSecret") == 0);
- assertTrue(endpoint.getApiKey().compareTo("onapSecret") == 0);
+ assertTrue(endpoint.getPassword().compareTo("onapSecret") == 0);
+ assertTrue(endpoint.getUsername().compareTo("onapSecret") == 0);
assertTrue(endpoint.getEventTopic().compareTo("eventTopic") == 0);
- assertTrue(endpoint.getGroupId().compareTo("groupId") == 0);
- assertTrue(endpoint.getGroupName().compareTo("gn") == 0);
+ assertTrue(endpoint.getConsumerId().compareTo("groupId") == 0);
+ assertTrue(endpoint.getConsumerGroup().compareTo("gn") == 0);
assertTrue(endpoint.getName().compareTo("name") == 0);
assertTrue(endpoint.getPoolSize() == 45);
assertTrue(endpoint.getPollingDelay() == 10);
@Test
public void validateEventBusComponent() throws Exception {
- EventBusComponent rc = new EventBusComponent(new TestCamelContext());
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent(new TestCamelContext());
Endpoint endpoint = rc.createEndpoint("http://host.com:8443/endpoint", null, new HashMap<String, Object>());
assertTrue(endpoint.getEndpointUri().equals("http://host.com:8443/endpoint"));
}
@Test
public void validateConsumer() throws Exception {
try {
- EventBusComponent rc = new EventBusComponent();
- EventBusEndpoint endpoint = new EventBusEndpoint("http://host.com:8443/endpoint", rc);
+ DMaaPEventBusComponent rc = new DMaaPEventBusComponent();
+ DMaaPEventBusEndpoint endpoint = new DMaaPEventBusEndpoint("http://host.com:8443/endpoint", rc);
- endpoint.setApiSecret("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
- endpoint.setApiKey("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setPassword("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
+ endpoint.setUsername("OBF:1y0q1uvc1uum1uvg1pil1pjl1uuq1uvk1uuu1y10");
endpoint.setEventTopic("eventTopic");
- endpoint.setGroupId("groupId");
- endpoint.setGroupName("gn");
+ endpoint.setConsumerId("groupId");
+ endpoint.setConsumerGroup("gn");
endpoint.setName("name");
endpoint.setPoolSize(45);
endpoint.setPollingDelay(10);
endpoint.setUrl("url");
TestProcessor processor = new TestProcessor();
- EventBusConsumer consumer = new EventBusConsumer(endpoint, processor);
-
+ EventBusConsumer consumer = (EventBusConsumer)endpoint.createConsumer(processor);
+
}
catch (Exception ex) {
StringWriter writer = new StringWriter();