2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.regex.Pattern;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Factory of UEB Source Topics indexed by topic name.
40 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
41 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
42 private static final String MISSING_TOPIC = "A topic must be provided";
47 private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
50 * UEB Topic Name Index.
52 protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
55 public UebTopicSource build(BusTopicParams busTopicParams) {
56 if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
57 throw new IllegalArgumentException("UEB Server(s) must be provided");
60 if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
61 throw new IllegalArgumentException(MISSING_TOPIC);
65 if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
66 return uebTopicSources.get(busTopicParams.getTopic());
69 var uebTopicSource = makeSource(busTopicParams);
71 if (busTopicParams.isManaged()) {
72 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
75 return uebTopicSource;
80 public List<UebTopicSource> build(Properties properties) {
82 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
83 if (StringUtils.isBlank(readTopics)) {
84 logger.info("{}: no topic for UEB Source", this);
85 return new ArrayList<>();
88 List<UebTopicSource> newUebTopicSources = new ArrayList<>();
90 for (String topic : COMMA_SPACE_PAT.split(readTopics)) {
91 addTopic(newUebTopicSources, topic, properties);
94 return newUebTopicSources;
98 public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
100 return this.build(BusTopicParams.builder()
104 .apiSecret(apiSecret)
105 .fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH)
106 .fetchLimit(PolicyEndPointProperties.DEFAULT_LIMIT_FETCH)
109 .allowSelfSignedCerts(true).build());
113 public UebTopicSource build(List<String> servers, String topic) {
114 return this.build(servers, topic, null, null);
117 private void addTopic(List<UebTopicSource> newUebTopicSources, String topic, Properties properties) {
118 if (this.uebTopicSources.containsKey(topic)) {
119 newUebTopicSources.add(this.uebTopicSources.get(topic));
123 String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic;
125 var props = new PropertyUtils(properties, topicPrefix,
126 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
128 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
129 if (StringUtils.isBlank(servers)) {
130 logger.error("{}: no UEB servers configured for sink {}", this, topic);
134 var uebTopicSource = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
135 .consumerGroup(props.getString(
136 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX, null))
137 .consumerInstance(props.getString(
138 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX, null))
139 .fetchTimeout(props.getInteger(
140 PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX,
141 PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH))
142 .fetchLimit(props.getInteger(PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX,
143 PolicyEndPointProperties.DEFAULT_LIMIT_FETCH))
146 newUebTopicSources.add(uebTopicSource);
150 * Makes a new source.
152 * @param busTopicParams parameters to use to configure the source
153 * @return a new source
155 protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
156 return new SingleThreadedUebTopicSource(busTopicParams);
160 public void destroy(String topic) {
162 if (topic == null || topic.isEmpty()) {
163 throw new IllegalArgumentException(MISSING_TOPIC);
166 UebTopicSource uebTopicSource;
168 synchronized (this) {
169 if (!uebTopicSources.containsKey(topic)) {
173 uebTopicSource = uebTopicSources.remove(topic);
176 uebTopicSource.shutdown();
180 public void destroy() {
181 List<UebTopicSource> readers = this.inventory();
182 for (UebTopicSource reader : readers) {
186 synchronized (this) {
187 this.uebTopicSources.clear();
192 public UebTopicSource get(String topic) {
194 if (topic == null || topic.isEmpty()) {
195 throw new IllegalArgumentException(MISSING_TOPIC);
198 synchronized (this) {
199 if (uebTopicSources.containsKey(topic)) {
200 return uebTopicSources.get(topic);
202 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
208 public synchronized List<UebTopicSource> inventory() {
209 return new ArrayList<>(this.uebTopicSources.values());
213 public String toString() {
214 return "IndexedUebTopicSourceFactory []";