2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019, 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.List;
26 import java.util.Properties;
27 import java.util.regex.Pattern;
28 import org.apache.commons.lang3.StringUtils;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.PropertyUtils;
33 import org.onap.policy.common.endpoints.utils.UebPropertyUtils;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Factory of UEB Reader Topics indexed by topic name.
40 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
41 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
42 private static final String MISSING_TOPIC = "A topic must be provided";
47 private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
50 * UEB Topic Name Index.
52 protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
55 public UebTopicSink build(BusTopicParams busTopicParams) {
57 if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
58 throw new IllegalArgumentException("UEB Server(s) must be provided");
61 if (StringUtils.isBlank(busTopicParams.getTopic())) {
62 throw new IllegalArgumentException(MISSING_TOPIC);
66 if (uebTopicSinks.containsKey(busTopicParams.getTopic())) {
67 return uebTopicSinks.get(busTopicParams.getTopic());
70 UebTopicSink uebTopicWriter = makeSink(busTopicParams);
72 if (busTopicParams.isManaged()) {
73 uebTopicSinks.put(busTopicParams.getTopic(), uebTopicWriter);
76 return uebTopicWriter;
82 public UebTopicSink build(List<String> servers, String topic) {
83 return this.build(BusTopicParams.builder()
88 .allowSelfSignedCerts(false)
94 public List<UebTopicSink> build(Properties properties) {
96 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
97 if (StringUtils.isBlank(writeTopics)) {
98 logger.info("{}: no topic for UEB Sink", this);
99 return new ArrayList<>();
102 List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
103 synchronized (this) {
104 for (String topic : COMMA_SPACE_PAT.split(writeTopics)) {
105 addTopic(newUebTopicSinks, topic, properties);
107 return newUebTopicSinks;
111 private void addTopic(List<UebTopicSink> newUebTopicSinks, String topic, Properties properties) {
112 if (this.uebTopicSinks.containsKey(topic)) {
113 newUebTopicSinks.add(this.uebTopicSinks.get(topic));
117 String topicPrefix = PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic;
119 var props = new PropertyUtils(properties, topicPrefix,
120 (name, value, ex) -> logger.warn("{}: {} {} is in invalid format for topic {} ", this, name, value, topic));
122 String servers = properties.getProperty(topicPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
123 if (StringUtils.isBlank(servers)) {
124 logger.error("{}: no UEB servers configured for sink {}", this, topic);
128 UebTopicSink uebTopicWriter = this.build(UebPropertyUtils.makeBuilder(props, topic, servers)
129 .partitionId(props.getString(PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX, null))
131 newUebTopicSinks.add(uebTopicWriter);
135 public void destroy(String topic) {
137 if (topic == null || topic.isEmpty()) {
138 throw new IllegalArgumentException(MISSING_TOPIC);
141 UebTopicSink uebTopicWriter;
142 synchronized (this) {
143 if (!uebTopicSinks.containsKey(topic)) {
147 uebTopicWriter = uebTopicSinks.remove(topic);
150 uebTopicWriter.shutdown();
154 public void destroy() {
155 List<UebTopicSink> writers = this.inventory();
156 for (UebTopicSink writer : writers) {
160 synchronized (this) {
161 this.uebTopicSinks.clear();
166 public UebTopicSink get(String topic) {
168 if (topic == null || topic.isEmpty()) {
169 throw new IllegalArgumentException(MISSING_TOPIC);
172 synchronized (this) {
173 if (uebTopicSinks.containsKey(topic)) {
174 return uebTopicSinks.get(topic);
176 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
182 public synchronized List<UebTopicSink> inventory() {
183 return new ArrayList<>(this.uebTopicSinks.values());
189 * @param busTopicParams parameters to use to configure the sink
192 protected UebTopicSink makeSink(BusTopicParams busTopicParams) {
193 return new InlineUebTopicSink(busTopicParams);
198 public String toString() {
199 return "IndexedUebTopicSinkFactory []";