2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2022 Nordix Foundation.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 * ============LICENSE_END=========================================================
19 package org.onap.policy.common.endpoints.event.comm.bus;
21 import com.google.re2j.Pattern;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Properties;
26 import org.apache.commons.lang3.StringUtils;
27 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineKafkaTopicSink;
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Factory of KAFKA Reader Topics indexed by topic name.
38 class IndexedKafkaTopicSinkFactory implements KafkaTopicSinkFactory {
39 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
40 private static final String MISSING_TOPIC = "A topic must be provided";
45 private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSinkFactory.class);
48 * KAFKA Topic Name Index.
50 protected HashMap<String, KafkaTopicSink> kafkaTopicSinks = new HashMap<>();
53 public KafkaTopicSink build(BusTopicParams busTopicParams) {
55 if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
56 throw new IllegalArgumentException("KAFKA Server(s) must be provided");
59 if (StringUtils.isBlank(busTopicParams.getTopic())) {
60 throw new IllegalArgumentException(MISSING_TOPIC);
64 if (kafkaTopicSinks.containsKey(busTopicParams.getTopic())) {
65 return kafkaTopicSinks.get(busTopicParams.getTopic());
68 KafkaTopicSink kafkaTopicWriter = makeSink(busTopicParams);
69 if (busTopicParams.isManaged()) {
70 kafkaTopicSinks.put(busTopicParams.getTopic(), kafkaTopicWriter);
73 return kafkaTopicWriter;
79 public KafkaTopicSink build(List<String> servers, String topic) {
80 return this.build(BusTopicParams.builder()
90 public List<KafkaTopicSink> build(Properties properties) {
92 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS);
93 if (StringUtils.isBlank(writeTopics)) {
94 logger.info("{}: no topic for KAFKA Sink", this);
95 return new ArrayList<>();
98 List<KafkaTopicSink> newKafkaTopicSinks = new ArrayList<>();
100 for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
101 addTopic(newKafkaTopicSinks, topic, properties);
103 return newKafkaTopicSinks;
107 private void addTopic(List<KafkaTopicSink> newKafkaTopicSinks, String topic, Properties properties) {
108 if (this.kafkaTopicSinks.containsKey(topic)) {
109 newKafkaTopicSinks.add(this.kafkaTopicSinks.get(topic));
113 String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS + "." + topic;
115 var props = new PropertyUtils(properties, topicPrefix,
116 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
118 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
119 if (StringUtils.isBlank(servers)) {
120 logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
124 KafkaTopicSink kafkaTopicWriter = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
125 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
127 newKafkaTopicSinks.add(kafkaTopicWriter);
131 public void destroy(String topic) {
133 if (topic == null || topic.isEmpty()) {
134 throw new IllegalArgumentException(MISSING_TOPIC);
137 KafkaTopicSink kafkaTopicWriter;
138 synchronized (this) {
139 if (!kafkaTopicSinks.containsKey(topic)) {
143 kafkaTopicWriter = kafkaTopicSinks.remove(topic);
146 kafkaTopicWriter.shutdown();
150 public void destroy() {
151 List<KafkaTopicSink> writers = this.inventory();
152 for (KafkaTopicSink writer : writers) {
156 synchronized (this) {
157 this.kafkaTopicSinks.clear();
162 public KafkaTopicSink get(String topic) {
164 if (topic == null || topic.isEmpty()) {
165 throw new IllegalArgumentException(MISSING_TOPIC);
168 synchronized (this) {
169 if (kafkaTopicSinks.containsKey(topic)) {
170 return kafkaTopicSinks.get(topic);
172 throw new IllegalStateException("KafkaTopicSink for " + topic + " not found");
178 public synchronized List<KafkaTopicSink> inventory() {
179 return new ArrayList<>(this.kafkaTopicSinks.values());
185 * @param busTopicParams parameters to use to configure the sink
188 protected KafkaTopicSink makeSink(BusTopicParams busTopicParams) {
189 return new InlineKafkaTopicSink(busTopicParams);
194 public String toString() {
195 return "IndexedKafkaTopicSinkFactory " + kafkaTopicSinks.keySet();