2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedUebTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Factory of UEB Source Topics indexed by topic name.
38 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
39 private static final String MISSING_TOPIC = "A topic must be provided";
44 private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);
47 * UEB Topic Name Index.
49 protected HashMap<String, UebTopicSource> uebTopicSources = new HashMap<>();
52 public UebTopicSource build(BusTopicParams busTopicParams) {
53 if (busTopicParams.getServers() == null || busTopicParams.getServers().isEmpty()) {
54 throw new IllegalArgumentException("UEB Server(s) must be provided");
57 if (busTopicParams.getTopic() == null || busTopicParams.getTopic().isEmpty()) {
58 throw new IllegalArgumentException(MISSING_TOPIC);
62 if (uebTopicSources.containsKey(busTopicParams.getTopic())) {
63 return uebTopicSources.get(busTopicParams.getTopic());
66 UebTopicSource uebTopicSource = makeSource(busTopicParams);
68 if (busTopicParams.isManaged()) {
69 uebTopicSources.put(busTopicParams.getTopic(), uebTopicSource);
72 return uebTopicSource;
77 public List<UebTopicSource> build(Properties properties) {
79 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS);
80 if (readTopics == null || readTopics.isEmpty()) {
81 logger.info("{}: no topic for UEB Source", this);
82 return new ArrayList<>();
84 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
86 List<UebTopicSource> newUebTopicSources = new ArrayList<>();
88 for (String topic : readTopicList) {
89 if (this.uebTopicSources.containsKey(topic)) {
90 newUebTopicSources.add(this.uebTopicSources.get(topic));
94 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
95 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
97 if (servers == null || servers.isEmpty()) {
98 logger.error("{}: no UEB servers configured for sink {}", this, topic);
102 final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
104 final String effectiveTopic = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
105 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_EFFECTIVE_TOPIC_SUFFIX, topic);
107 final String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
108 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
110 final String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
111 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
113 final String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
114 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
116 final String consumerInstance = properties.getProperty(
117 PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
118 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
120 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
121 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
122 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
123 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
125 fetchTimeout = Integer.parseInt(fetchTimeoutString);
126 } catch (NumberFormatException nfe) {
127 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
132 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS
133 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
134 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
135 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
137 fetchLimit = Integer.parseInt(fetchLimitString);
138 } catch (NumberFormatException nfe) {
139 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
144 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
145 + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
146 boolean managed = true;
147 if (managedString != null && !managedString.isEmpty()) {
148 managed = Boolean.parseBoolean(managedString);
151 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "."
152 + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
154 // default is to use HTTP if no https property exists
155 boolean useHttps = false;
156 if (useHttpsString != null && !useHttpsString.isEmpty()) {
157 useHttps = Boolean.parseBoolean(useHttpsString);
160 String allowSelfSignedCertsString =
161 properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic
162 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
164 // default is to disallow self-signed certs
165 boolean allowSelfSignedCerts = false;
166 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
167 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
170 UebTopicSource uebTopicSource = this.build(BusTopicParams.builder()
173 .effectiveTopic(effectiveTopic)
175 .apiSecret(apiSecret)
176 .consumerGroup(consumerGroup)
177 .consumerInstance(consumerInstance)
178 .fetchTimeout(fetchTimeout)
179 .fetchLimit(fetchLimit)
182 .allowSelfSignedCerts(allowSelfSignedCerts).build());
183 newUebTopicSources.add(uebTopicSource);
186 return newUebTopicSources;
190 public UebTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
192 return this.build(BusTopicParams.builder()
196 .apiSecret(apiSecret)
197 .fetchTimeout(UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH)
198 .fetchLimit(UebTopicSource.DEFAULT_LIMIT_FETCH)
201 .allowSelfSignedCerts(true).build());
205 public UebTopicSource build(List<String> servers, String topic) {
206 return this.build(servers, topic, null, null);
210 * Makes a new source.
212 * @param busTopicParams parameters to use to configure the source
213 * @return a new source
215 protected UebTopicSource makeSource(BusTopicParams busTopicParams) {
216 return new SingleThreadedUebTopicSource(busTopicParams);
220 public void destroy(String topic) {
222 if (topic == null || topic.isEmpty()) {
223 throw new IllegalArgumentException(MISSING_TOPIC);
226 UebTopicSource uebTopicSource;
228 synchronized (this) {
229 if (!uebTopicSources.containsKey(topic)) {
233 uebTopicSource = uebTopicSources.remove(topic);
236 uebTopicSource.shutdown();
240 public void destroy() {
241 List<UebTopicSource> readers = this.inventory();
242 for (UebTopicSource reader : readers) {
246 synchronized (this) {
247 this.uebTopicSources.clear();
252 public UebTopicSource get(String topic) {
254 if (topic == null || topic.isEmpty()) {
255 throw new IllegalArgumentException(MISSING_TOPIC);
258 synchronized (this) {
259 if (uebTopicSources.containsKey(topic)) {
260 return uebTopicSources.get(topic);
262 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
268 public synchronized List<UebTopicSource> inventory() {
269 return new ArrayList<>(this.uebTopicSources.values());
273 public String toString() {
274 StringBuilder builder = new StringBuilder();
275 builder.append("IndexedUebTopicSourceFactory []");
276 return builder.toString();