151d8f6983032279c0d385f8e79c67e4b34c782c
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * Copyright (C) 2022-2023 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ============LICENSE_END=========================================================
17  */
18
19 package org.onap.policy.common.endpoints.event.comm.bus;
20
21 import com.google.re2j.Pattern;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Properties;
26 import org.apache.commons.lang3.StringUtils;
27 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource;
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Factory of KAFKA Source Topics indexed by topic name.
37  */
38 class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
39     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
40     private static final String MISSING_TOPIC = "A topic must be provided";
41
42     /**
43      * Logger.
44      */
45     private static final Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
46
47     /**
48      * KAFKA Topic Name Index.
49      */
50     protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>();
51
52     @Override
53     public KafkaTopicSource build(BusTopicParams busTopicParams) {
54         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
55             throw new IllegalArgumentException("KAFKA Server(s) must be provided");
56         }
57
58         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
59             throw new IllegalArgumentException(MISSING_TOPIC);
60         }
61
62         synchronized (this) {
63             if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) {
64                 return kafkaTopicSources.get(busTopicParams.getTopic());
65             }
66
67             var kafkaTopicSource = makeSource(busTopicParams);
68
69             kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
70
71             return kafkaTopicSource;
72         }
73     }
74
75     @Override
76     public List<KafkaTopicSource> build(Properties properties) {
77
78         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS);
79         if (StringUtils.isBlank(readTopics)) {
80             logger.info("{}: no topic for KAFKA Source", this);
81             return new ArrayList<>();
82         }
83
84         List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
85         synchronized (this) {
86             for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
87                 addTopic(newKafkaTopicSources, topic.toLowerCase(), properties);
88             }
89         }
90         return newKafkaTopicSources;
91     }
92
93     @Override
94     public KafkaTopicSource build(List<String> servers, String topic) {
95         return this.build(BusTopicParams.builder()
96                 .servers(servers)
97                 .topic(topic)
98                 .managed(true)
99                 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
100                 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
101                 .useHttps(false).build());
102     }
103
104     private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) {
105         if (this.kafkaTopicSources.containsKey(topic)) {
106             newKafkaTopicSources.add(this.kafkaTopicSources.get(topic));
107             return;
108         }
109
110         String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
111
112         var props = new PropertyUtils(properties, topicPrefix,
113             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic source {} ",
114                 this, name, value, topic));
115
116         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
117         if (StringUtils.isBlank(servers)) {
118             logger.error("{}: no KAFKA servers configured for source {}", this, topic);
119             return;
120         }
121
122         var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
123                 .consumerGroup(props.getString(
124                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
125                 .consumerInstance(props.getString(
126                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
127                 .fetchTimeout(props.getInteger(
128                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
129                         PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
130                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
131                         PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
132                 .build());
133
134         newKafkaTopicSources.add(kafkaTopicSource);
135     }
136
137     /**
138      * Makes a new source.
139      *
140      * @param busTopicParams parameters to use to configure the source
141      * @return a new source
142      */
143     protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
144         return new SingleThreadedKafkaTopicSource(busTopicParams);
145     }
146
147     @Override
148     public void destroy(String topic) {
149
150         if (topic == null || topic.isEmpty()) {
151             throw new IllegalArgumentException(MISSING_TOPIC);
152         }
153
154         KafkaTopicSource kafkaTopicSource;
155
156         synchronized (this) {
157             if (!kafkaTopicSources.containsKey(topic)) {
158                 return;
159             }
160
161             kafkaTopicSource = kafkaTopicSources.remove(topic);
162         }
163
164         kafkaTopicSource.shutdown();
165     }
166
167     @Override
168     public void destroy() {
169         List<KafkaTopicSource> readers = this.inventory();
170         for (KafkaTopicSource reader : readers) {
171             reader.shutdown();
172         }
173
174         synchronized (this) {
175             this.kafkaTopicSources.clear();
176         }
177     }
178
179     @Override
180     public KafkaTopicSource get(String topic) {
181
182         if (topic == null || topic.isEmpty()) {
183             throw new IllegalArgumentException(MISSING_TOPIC);
184         }
185
186         synchronized (this) {
187             if (kafkaTopicSources.containsKey(topic)) {
188                 return kafkaTopicSources.get(topic);
189             } else {
190                 throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found");
191             }
192         }
193     }
194
195     @Override
196     public synchronized List<KafkaTopicSource> inventory() {
197         return new ArrayList<>(this.kafkaTopicSources.values());
198     }
199
200     @Override
201     public String toString() {
202         return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet();
203     }
204 }