2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
29 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSink;
30 import org.onap.policy.common.endpoints.event.comm.bus.UebTopicSinkFactory;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.InlineUebTopicSink;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Factory of UEB Reader Topics indexed by topic name
39 public class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
41 private static final IndexedUebTopicSinkFactory instance = new IndexedUebTopicSinkFactory();
43 private static final String MISSING_TOPIC = "A topic must be provided";
48 private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
51 * UEB Topic Name Index
53 protected HashMap<String, UebTopicSink> uebTopicSinks = new HashMap<>();
56 * Get the singleton instance.
58 * @return the instance
60 public static IndexedUebTopicSinkFactory getInstance() {
64 private IndexedUebTopicSinkFactory() {}
67 public UebTopicSink build(List<String> servers, String topic, String apiKey, String apiSecret, String partitionKey,
68 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
70 if (servers == null || servers.isEmpty()) {
71 throw new IllegalArgumentException("UEB Server(s) must be provided");
74 if (topic == null || topic.isEmpty()) {
75 throw new IllegalArgumentException(MISSING_TOPIC);
79 if (uebTopicSinks.containsKey(topic)) {
80 return uebTopicSinks.get(topic);
83 UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, apiKey, apiSecret, partitionKey,
84 useHttps, allowSelfSignedCerts);
87 uebTopicSinks.put(topic, uebTopicWriter);
90 return uebTopicWriter;
96 public UebTopicSink build(List<String> servers, String topic) {
97 return this.build(servers, topic, null, null, null, true, false, false);
102 public List<UebTopicSink> build(Properties properties) {
104 String writeTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS);
105 if (writeTopics == null || writeTopics.isEmpty()) {
106 logger.info("{}: no topic for UEB Sink", this);
107 return new ArrayList<>();
110 List<String> writeTopicList = new ArrayList<>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
111 List<UebTopicSink> newUebTopicSinks = new ArrayList<>();
112 synchronized (this) {
113 for (String topic : writeTopicList) {
114 if (this.uebTopicSinks.containsKey(topic)) {
115 newUebTopicSinks.add(this.uebTopicSinks.get(topic));
119 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
120 + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
121 if (servers == null || servers.isEmpty()) {
122 logger.error("{}: no UEB servers configured for sink {}", this, topic);
126 List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
128 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
129 + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
130 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
131 + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
132 String partitionKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
133 + PolicyEndPointProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
135 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
136 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
137 boolean managed = true;
138 if (managedString != null && !managedString.isEmpty()) {
139 managed = Boolean.parseBoolean(managedString);
142 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic
143 + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
145 // default is to use HTTP if no https property exists
146 boolean useHttps = false;
147 if (useHttpsString != null && !useHttpsString.isEmpty()) {
148 useHttps = Boolean.parseBoolean(useHttpsString);
152 String allowSelfSignedCertsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS
153 + "." + topic + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
155 // default is to disallow self-signed certs
156 boolean allowSelfSignedCerts = false;
157 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
158 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
161 UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, partitionKey, managed,
162 useHttps, allowSelfSignedCerts);
163 newUebTopicSinks.add(uebTopicWriter);
165 return newUebTopicSinks;
170 public void destroy(String topic) {
172 if (topic == null || topic.isEmpty()) {
173 throw new IllegalArgumentException(MISSING_TOPIC);
176 UebTopicSink uebTopicWriter;
177 synchronized (this) {
178 if (!uebTopicSinks.containsKey(topic)) {
182 uebTopicWriter = uebTopicSinks.remove(topic);
185 uebTopicWriter.shutdown();
189 public void destroy() {
190 List<UebTopicSink> writers = this.inventory();
191 for (UebTopicSink writer : writers) {
195 synchronized (this) {
196 this.uebTopicSinks.clear();
201 public UebTopicSink get(String topic) {
203 if (topic == null || topic.isEmpty()) {
204 throw new IllegalArgumentException(MISSING_TOPIC);
207 synchronized (this) {
208 if (uebTopicSinks.containsKey(topic)) {
209 return uebTopicSinks.get(topic);
211 throw new IllegalStateException("UebTopicSink for " + topic + " not found");
217 public synchronized List<UebTopicSink> inventory() {
218 return new ArrayList<>(this.uebTopicSinks.values());
223 public String toString() {
224 StringBuilder builder = new StringBuilder();
225 builder.append("IndexedUebTopicSinkFactory []");
226 return builder.toString();