Assume the username will not be obfuscated
[aai/router-core.git] / src / main / java / org / onap / aai / event / KafkaEventBusEndpoint.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.aai.event;
22
23 import org.apache.camel.Consumer;
24 import org.apache.camel.Processor;
25 import org.apache.camel.Producer;
26 import org.apache.camel.impl.DefaultEndpoint;
27 import org.apache.camel.spi.Metadata;
28 import org.apache.camel.spi.UriEndpoint;
29 import org.apache.camel.spi.UriParam;
30 import org.onap.aai.event.client.KafkaEventConsumer;
31
32 /**
33  * Represents a EventBus endpoint.
34  */
35 @UriEndpoint(scheme = "kafka-event-bus", syntax = "kafka-event-bus:name",
36     consumerClass = EventBusConsumer.class, title = "kafka-event-bus")
37 public class KafkaEventBusEndpoint extends AbstractEventBusEndpoint {
38   @UriParam(label = "url")
39   @Metadata(required = "true")
40   private String url;
41   @UriParam(label = "eventTopic")
42   @Metadata(required = "true")
43   private String eventTopic;
44   @UriParam(label = "consumerGroup")
45   @Metadata(required = "true")
46   private String consumerGroup;
47   @UriParam(label = "poolSize")
48   @Metadata(required = "true", defaultValue="20")
49   private int poolSize = 20;
50   @UriParam(label = "pollingDelay")
51   @Metadata(required = "true", defaultValue="30000")
52   private int pollingDelay = 30000;
53
54   private KafkaEventConsumer consumer;
55
56   public KafkaEventBusEndpoint(String uri, KafkaEventBusComponent component) {
57     super(uri, component);
58   }
59
60   @Override
61   public Producer createProducer() throws Exception {
62     return new EventBusProducer(this);
63   }
64
65   @Override
66   public Consumer createConsumer(Processor processor) throws Exception {
67     consumer = new KafkaEventConsumer(url, eventTopic, consumerGroup);
68     return new EventBusConsumer(this, processor, consumer);
69   }
70
71   @Override
72   public boolean isSingleton() {
73     return false;
74   }
75
76   public String getUrl() {
77     return url;
78   }
79
80   public void setUrl(String url) {
81     this.url = url;
82   }
83
84   @Override
85   String getEventTopic() {
86     return eventTopic;
87   }
88
89   public void setEventTopic(String eventTopic) {
90     this.eventTopic = eventTopic;
91   }
92
93   public String getConsumerGroup() {
94     return consumerGroup;
95   }
96
97   public void setConsumerGroup(String consumerGroup) {
98     this.consumerGroup = consumerGroup;
99   }
100
101   @Override
102   int getPoolSize() {
103     return poolSize;
104   }
105
106   public void setPoolSize(int poolSize) {
107     this.poolSize = poolSize;
108   }
109
110   @Override
111   void close() {
112     consumer.close();
113   }
114
115   @Override
116   int getPollingDelay() {
117     return pollingDelay;
118   }
119
120   public void setPollingDelay(int pollingDelay) {
121     this.pollingDelay = pollingDelay;
122   }
123
124
125
126
127
128
129 }