2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * UEB Topic Sink Factory
37 public interface UebTopicSinkFactory {
40 * Instantiates a new UEB Topic Writer
42 * @param servers list of servers
43 * @param topic topic name
44 * @param apiKey API Key
45 * @param apiSecret API Secret
46 * @param partitionKey Consumer Group
47 * @param managed is this sink endpoint managed?
49 * @return an UEB Topic Sink
50 * @throws IllegalArgumentException if invalid parameters are present
52 public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
53 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
56 * Creates an UEB Topic Writer based on properties files
58 * @param properties Properties containing initialization values
60 * @return an UEB Topic Writer
61 * @throws IllegalArgumentException if invalid parameters are present
63 public List<UebTopicSink> build(Properties properties);
66 * Instantiates a new UEB Topic Writer
68 * @param servers list of servers
69 * @param topic topic name
71 * @return an UEB Topic Writer
72 * @throws IllegalArgumentException if invalid parameters are present
74 public UebTopicSink build(List<String> servers, String topic);
77 * Destroys an UEB Topic Writer based on a topic
79 * @param topic topic name
80 * @throws IllegalArgumentException if invalid parameters are present
82 public void destroy(String topic);
85 * gets an UEB Topic Writer based on topic name
87 * @param topic the topic name
89 * @return an UEB Topic Writer with topic name
90 * @throws IllegalArgumentException if an invalid topic is provided
91 * @throws IllegalStateException if the UEB Topic Reader is an incorrect state
93 public UebTopicSink get(String topic);
96 * Provides a snapshot of the UEB Topic Writers
98 * @return a list of the UEB Topic Writers
100 public List<UebTopicSink> inventory();
103 * Destroys all UEB Topic Writers
105 public void destroy();
109 /* ------------- implementation ----------------- */
112 * Factory of UEB Reader Topics indexed by topic name
114 class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
115 private static final String MISSING_TOPIC = "A topic must be provided";
120 private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
123 * UEB Topic Name Index
125 protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
128 public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
129 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
131 if (servers == null || servers.isEmpty()) {
132 throw new IllegalArgumentException("UEB Server(s) must be provided");
135 if (topic == null || topic.isEmpty()) {
136 throw new IllegalArgumentException(MISSING_TOPIC);
139 synchronized (this) {
140 if (uebTopicSinks.containsKey(topic)) {
141 return uebTopicSinks.get(topic);
144 UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey,
145 useHttps, allowSelfSignedCerts);
148 uebTopicSinks.put(topic, uebTopicWriter);
151 return uebTopicWriter;
157 public UebTopicSink build(List<String> servers, String topic) {
158 return this.build(servers, topic, null, null, null, true, false, false);
163 public List<UebTopicSink> build(Properties properties) {
165 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
166 if (writeTopics == null || writeTopics.isEmpty()) {
167 logger.info("{}: no topic for UEB Sink", this);
168 return new ArrayList<>();
171 List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
172 List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
173 synchronized (this) {
174 for (String topic : writeTopicList) {
175 if (this.uebTopicSinks.containsKey(topic)) {
176 newUebTopicSinks.add(this.uebTopicSinks.get(topic));
180 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
181 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
182 if (servers == null || servers.isEmpty()) {
183 logger.error("{}: no UEB servers configured for sink {}", this, topic);
187 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
189 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
190 + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
191 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
192 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
193 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
194 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
196 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
197 + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
198 boolean managed = true;
199 if (managedString != null && !managedString.isEmpty()) {
200 managed = Boolean.parseBoolean(managedString);
203 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "."
204 + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
206 // default is to use HTTP if no https property exists
207 boolean useHttps = false;
208 if (useHttpsString != null && !useHttpsString.isEmpty()) {
209 useHttps = Boolean.parseBoolean(useHttpsString);
213 String allowSelfSignedCertsString =
214 properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
215 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
217 // default is to disallow self-signed certs
218 boolean allowSelfSignedCerts = false;
219 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
220 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
223 UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed,
224 useHttps, allowSelfSignedCerts);
225 newUebTopicSinks.add(uebTopicWriter);
227 return newUebTopicSinks;
232 public void destroy(String topic) {
234 if (topic == null || topic.isEmpty()) {
235 throw new IllegalArgumentException(MISSING_TOPIC);
238 UebTopicSink uebTopicWriter;
239 synchronized (this) {
240 if (!uebTopicSinks.containsKey(topic)) {
244 uebTopicWriter = uebTopicSinks.remove(topic);
247 uebTopicWriter.shutdown();
251 public void destroy() {
252 List<UebTopicSink> writers = this.inventory();
253 for (UebTopicSink writer : writers) {
257 synchronized (this) {
258 this.uebTopicSinks.clear();
263 public UebTopicSink get(String topic) {
265 if (topic == null || topic.isEmpty()) {
266 throw new IllegalArgumentException(MISSING_TOPIC);
269 synchronized (this) {
270 if (uebTopicSinks.containsKey(topic)) {
271 return uebTopicSinks.get(topic);
273 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
279 public synchronized List<UebTopicSink> inventory() {
280 return new ArrayList<>(this.uebTopicSinks.values());
285 public String toString() {
286 StringBuilder builder = new StringBuilder();
287 builder.append("IndexedUebTopicSinkFactory []");
288 return builder.toString();