* ============LICENSE_START=======================================================
* org.onap.aai
* ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * Copyright © 2017 Amdocs
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
*/
package org.onap.aai.event;
-import com.att.nsa.cambria.client.CambriaClientBuilders;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
-import com.att.nsa.cambria.client.CambriaConsumer;
+import org.onap.aai.event.api.EventConsumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.support.ScheduledPollConsumer;
import org.onap.aai.logging.RouterCoreMsgs;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
private Logger logger = LoggerFactory.getInstance().getLogger(EventBusConsumer.class);
private Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EventBusConsumer.class);
- private final EventBusEndpoint endpoint;
+ private final AbstractEventBusEndpoint endpoint;
- private CambriaConsumer consumer;
+ private EventConsumer consumer;
/**
* EventBusConsumer Constructor.
*/
- public EventBusConsumer(EventBusEndpoint endpoint, Processor processor) {
+ public EventBusConsumer(AbstractEventBusEndpoint endpoint, Processor processor, EventConsumer consumer) {
super(endpoint, processor);
super.setDelay(endpoint.getPollingDelay());
this.endpoint = endpoint;
setScheduledExecutorService(new ScheduledThreadPoolExecutor(endpoint.getPoolSize()));
- String[] urls = endpoint.getUrl().split(",");
-
- List<String> urlList = null;
-
- if (urls != null) {
- urlList = Arrays.asList(urls);
- }
-
- try {
-
- ConsumerBuilder consumerBuilder = new CambriaClientBuilders.ConsumerBuilder()
- .usingHosts(urlList).onTopic(endpoint.getEventTopic())
- .knownAs(endpoint.getGroupName(), endpoint.getGroupId());
-
- String apiKey = endpoint.getApiKey();
- String apiSecret = endpoint.getApiSecret();
-
- if (apiKey != null && apiSecret != null) {
- consumerBuilder.authenticatedBy(endpoint.getApiKey(), endpoint.getApiSecret());
- }
-
- consumer = consumerBuilder.build();
-
- } catch (MalformedURLException | GeneralSecurityException e) {
- logger.error(RouterCoreMsgs.EVENT_CONSUMER_CREATION_EXCEPTION, e.getLocalizedMessage());
- }
+ this.consumer = consumer;
}
/**
@Override
protected int poll() throws Exception {
- logger.debug("Checking for event on topic: " + endpoint.getEventTopic());
+ String topic = endpoint.getEventTopic();
+
+ logger.debug("Checking for event on topic: " + topic);
int processCount = 0;
- Iterable<String> messages = null;
+ try {
+ Iterable<String> messages = consumer.consumeAndCommit();
- messages = consumer.fetch();
+ for (String message : messages) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setBody(message);
+ getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
+ ++processCount;
+ }
+ } catch (Exception e) {
+ logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e, e.getLocalizedMessage());
+ }
- String topic = endpoint.getEventTopic();
+ logger.debug(RouterCoreMsgs.PROCESS_EVENT, topic, Integer.toString(processCount));
- for (String message : messages) {
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(message);
- getScheduledExecutorService().submit(new EventProcessor(exchange, topic));
- ++processCount;
- }
return processCount;
}
-
+ @Override
protected void doStop() throws Exception {
super.doStop();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.end();
}
}
-
+ @Override
protected void doShutdown() throws Exception {
super.doShutdown();
- if (consumer != null) {
- consumer.close();
+ if (endpoint != null) {
+ endpoint.end();
}
}
this.message = message;
this.topic = topic;
}
-
+ @Override
public void run() {
try {
}
} catch (Exception e) {
- logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION, e.getLocalizedMessage());
+ logger.error(RouterCoreMsgs.EVENT_PROCESSING_EXCEPTION,e,e.getLocalizedMessage());
} finally {
// log exception if an exception occurred and was not handled
if (message.getException() != null) {