2  * ============LICENSE_START=======================================================
 
   3  * Copyright (C) 2022-2023 Nordix Foundation.
 
   4  * ================================================================================
 
   5  * Licensed under the Apache License, Version 2.0 (the "License");
 
   6  * you may not use this file except in compliance with the License.
 
   7  * You may obtain a copy of the License at
 
   9  *      http://www.apache.org/licenses/LICENSE-2.0
 
  11  * Unless required by applicable law or agreed to in writing, software
 
  12  * distributed under the License is distributed on an "AS IS" BASIS,
 
  13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  14  * See the License for the specific language governing permissions and
 
  15  * limitations under the License.
 
  16  * ============LICENSE_END=========================================================
 
  19 package org.onap.policy.common.endpoints.event.comm.bus;
 
  21 import com.google.re2j.Pattern;
 
  22 import java.util.ArrayList;
 
  23 import java.util.HashMap;
 
  24 import java.util.List;
 
  25 import java.util.Properties;
 
  26 import org.apache.commons.lang3.StringUtils;
 
  27 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
 
  28 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedKafkaTopicSource;
 
  29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
 
  30 import org.onap.policy.common.endpoints.utils.KafkaPropertyUtils;
 
  31 import org.onap.policy.common.endpoints.utils.PropertyUtils;
 
  32 import org.slf4j.Logger;
 
  33 import org.slf4j.LoggerFactory;
 
  36  * Factory of KAFKA Source Topics indexed by topic name.
 
  38 class IndexedKafkaTopicSourceFactory implements KafkaTopicSourceFactory {
 
  39     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
 
  40     private static final String MISSING_TOPIC = "A topic must be provided";
 
  45     private static Logger logger = LoggerFactory.getLogger(IndexedKafkaTopicSourceFactory.class);
 
  48      * KAFKA Topic Name Index.
 
  50     protected HashMap<String, KafkaTopicSource> kafkaTopicSources = new HashMap<>();
 
  53     public KafkaTopicSource build(BusTopicParams busTopicParams) {
 
  54         if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
 
  55             throw new IllegalArgumentException("KAFKA Server(s) must be provided");
 
  58         if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
 
  59             throw new IllegalArgumentException(MISSING_TOPIC);
 
  63             if (kafkaTopicSources.containsKey(busTopicParams.getTopic())) {
 
  64                 return kafkaTopicSources.get(busTopicParams.getTopic());
 
  67             var kafkaTopicSource = makeSource(busTopicParams);
 
  69             kafkaTopicSources.put(busTopicParams.getTopic(), kafkaTopicSource);
 
  71             return kafkaTopicSource;
 
  76     public List<KafkaTopicSource> build(Properties properties) {
 
  78         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS);
 
  79         if (StringUtils.isBlank(readTopics)) {
 
  80             logger.info("{}: no topic for KAFKA Source", this);
 
  81             return new ArrayList<>();
 
  84         List<KafkaTopicSource> newKafkaTopicSources = new ArrayList<>();
 
  86             for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
 
  87                 addTopic(newKafkaTopicSources, topic, properties);
 
  90         return newKafkaTopicSources;
 
  94     public KafkaTopicSource build(List<String> servers, String topic) {
 
  95         return this.build(BusTopicParams.builder()
 
  99                 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
 
 100                 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
 
 101                 .useHttps(false).build());
 
 104     private void addTopic(List<KafkaTopicSource> newKafkaTopicSources, String topic, Properties properties) {
 
 105         if (this.kafkaTopicSources.containsKey(topic)) {
 
 106             newKafkaTopicSources.add(this.kafkaTopicSources.get(topic));
 
 110         String topicPrefix = PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS + "." + topic;
 
 112         var props = new PropertyUtils(properties, topicPrefix,
 
 113             (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
 
 115         String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
 
 116         if (StringUtils.isBlank(servers)) {
 
 117             logger.error("{}: no KAFKA servers configured for sink {}", this, topic);
 
 121         var kafkaTopicSource = this.build(KafkaPropertyUtils.makeBuilder(props, topic, servers)
 
 122                 .consumerGroup(props.getString(
 
 123                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
 
 124                 .consumerInstance(props.getString(
 
 125                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
 
 126                 .fetchTimeout(props.getInteger(
 
 127                         PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
 
 128                         PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
 
 129                 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
 
 130                         PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
 
 133         newKafkaTopicSources.add(kafkaTopicSource);
 
 137      * Makes a new source.
 
 139      * @param busTopicParams parameters to use to configure the source
 
 140      * @return a new source
 
 142     protected KafkaTopicSource makeSource(BusTopicParams busTopicParams) {
 
 143         return new SingleThreadedKafkaTopicSource(busTopicParams);
 
 147     public void destroy(String topic) {
 
 149         if (topic == null || topic.isEmpty()) {
 
 150             throw new IllegalArgumentException(MISSING_TOPIC);
 
 153         KafkaTopicSource kafkaTopicSource;
 
 155         synchronized (this) {
 
 156             if (!kafkaTopicSources.containsKey(topic)) {
 
 160             kafkaTopicSource = kafkaTopicSources.remove(topic);
 
 163         kafkaTopicSource.shutdown();
 
 167     public void destroy() {
 
 168         List<KafkaTopicSource> readers = this.inventory();
 
 169         for (KafkaTopicSource reader : readers) {
 
 173         synchronized (this) {
 
 174             this.kafkaTopicSources.clear();
 
 179     public KafkaTopicSource get(String topic) {
 
 181         if (topic == null || topic.isEmpty()) {
 
 182             throw new IllegalArgumentException(MISSING_TOPIC);
 
 185         synchronized (this) {
 
 186             if (kafkaTopicSources.containsKey(topic)) {
 
 187                 return kafkaTopicSources.get(topic);
 
 189                 throw new IllegalStateException("KafkaTopiceSource for " + topic + " not found");
 
 195     public synchronized List<KafkaTopicSource> inventory() {
 
 196         return new ArrayList<>(this.kafkaTopicSources.values());
 
 200     public String toString() {
 
 201         return "IndexedKafkaTopicSourceFactory " + kafkaTopicSources.keySet();