2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022-2023 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.endpoints.event.comm.bus;
21 import com.google.re2j.Pattern;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Properties;
26 import org.apache.commons.lang3.StringUtils;
27 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource;
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Factory of KAFKA Source Topics indexed by topic name.
38 class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
39 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
40 private static final String MISSING_TOPIC = "A topic must be provided";
45 private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
48 * KAFKA Topic Name Index.
50 protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>();
53 public KafkaTopicSource build(BusTopicParams busTopicParams) {
54 if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
55 throw new IllegalArgumentException("KAFKA Server(s) must be provided");
58 if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
59 throw new IllegalArgumentException(MISSING_TOPIC);
63 if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) {
64 return kafkaTopicSources.get(busTopicParams.getTopic());
67 var kafkaTopicSource = makeSource(busTopicParams);
69 kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
71 return kafkaTopicSource;
76 public List<KafkaTopicSource> build(Properties properties) {
78 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS);
79 if (StringUtils.isBlank(readTopics)) {
80 logger.info("{}: no topic for KAFKA Source", this);
81 return new ArrayList<>();
84 List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
86 for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
87 addTopic(newKafkaTopicSources, topic.toLowerCase(), properties);
90 return newKafkaTopicSources;
94 public KafkaTopicSource build(List<String> servers, String topic) {
95 return this.build(BusTopicParams.builder()
99 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
100 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
101 .useHttps(false).build());
104 private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) {
105 if (this.kafkaTopicSources.containsKey(topic)) {
106 newKafkaTopicSources.add(this.kafkaTopicSources.get(topic));
110 String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
112 var props = new PropertyUtils(properties, topicPrefix,
113 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ",
114 this, name, value, topic));
116 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
117 if (StringUtils.isBlank(servers)) {
118 logger.error("{}: no KAFKA servers configured for source {}", this, topic);
122 var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
123 .consumerGroup(props.getString(
124 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
125 .consumerInstance(props.getString(
126 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
127 .fetchTimeout(props.getInteger(
128 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
129 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
130 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
131 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
134 newKafkaTopicSources.add(kafkaTopicSource);
138 * Makes a new source.
140 * @param busTopicParams parameters to use to configure the source
141 * @return a new source
143 protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
144 return new SingleThreadedKafkaTopicSource(busTopicParams);
148 public void destroy(String topic) {
150 if (topic == null || topic.isEmpty()) {
151 throw new IllegalArgumentException(MISSING_TOPIC);
154 KafkaTopicSource kafkaTopicSource;
156 synchronized (this) {
157 if (!kafkaTopicSources.containsKey(topic)) {
161 kafkaTopicSource = kafkaTopicSources.remove(topic);
164 kafkaTopicSource.shutdown();
168 public void destroy() {
169 List<KafkaTopicSource> readers = this.inventory();
170 for (KafkaTopicSource reader : readers) {
174 synchronized (this) {
175 this.kafkaTopicSources.clear();
180 public KafkaTopicSource get(String topic) {
182 if (topic == null || topic.isEmpty()) {
183 throw new IllegalArgumentException(MISSING_TOPIC);
186 synchronized (this) {
187 if (kafkaTopicSources.containsKey(topic)) {
188 return kafkaTopicSources.get(topic);
190 throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found");
196 public synchronized List<KafkaTopicSource> inventory() {
197 return new ArrayList<>(this.kafkaTopicSources.values());
201 public String toString() {
202 return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet();