2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.aai.event;
23 import org.apache.camel.Consumer;
24 import org.apache.camel.Processor;
25 import org.apache.camel.Producer;
26 import org.apache.camel.impl.DefaultEndpoint;
27 import org.apache.camel.spi.Metadata;
28 import org.apache.camel.spi.UriEndpoint;
29 import org.apache.camel.spi.UriParam;
30 import org.onap.aai.event.client.KafkaEventConsumer;
33 * Represents a EventBus endpoint.
35 @UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
36 consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
37 public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
38 @UriParam(label = "url")
39 @Metadata(required = "true")
41 @UriParam(label = "eventTopic")
42 @Metadata(required = "true")
43 private String eventTopic;
44 @UriParam(label = "consumerGroup")
45 @Metadata(required = "true")
46 private String consumerGroup;
47 @UriParam(label = "poolSize")
48 @Metadata(required = "true", defaultValue="20")
49 private int poolSize = 20;
50 @UriParam(label = "pollingDelay")
51 @Metadata(required = "true", defaultValue="30000")
52 private int pollingDelay = 30000;
54 private KafkaEventConsumer consumer;
56 public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
57 super(uri, component);
61 public Producer createProducer() throws Exception {
62 return new EventBusProducer(this);
66 public Consumer createConsumer(Processor processor) throws Exception {
67 consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
68 return new EventBusConsumer(this, processor, consumer);
72 public boolean isSingleton() {
76 public String getUrl() {
80 public void setUrl(String url) {
85 String getEventTopic() {
89 public void setEventTopic(String eventTopic) {
90 this.eventTopic = eventTopic;
93 public String getConsumerGroup() {
97 public void setConsumerGroup(String consumerGroup) {
98 this.consumerGroup = consumerGroup;
106 public void setPoolSize(int poolSize) {
107 this.poolSize = poolSize;
116 int getPollingDelay() {
120 public void setPollingDelay(int pollingDelay) {
121 this.pollingDelay = pollingDelay;