2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.impl;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.Properties;
30 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSourceFactory;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Factory of DMAAP Source Topics indexed by topic name
41 public class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
42 private static final String MISSING_TOPIC = "A topic must be provided";
44 private static final IndexedDmaapTopicSourceFactory instance = new IndexedDmaapTopicSourceFactory();
49 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
52 * DMaaP Topic Name Index
54 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
57 * Get the singleton instance.
59 * @return the instance
61 public static IndexedDmaapTopicSourceFactory getInstance() {
65 private IndexedDmaapTopicSourceFactory() {}
71 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
72 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
73 String environment, String aftEnvironment, String partner, String latitude, String longitude,
74 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
76 if (topic == null || topic.isEmpty()) {
77 throw new IllegalArgumentException(MISSING_TOPIC);
81 if (dmaapTopicSources.containsKey(topic)) {
82 return dmaapTopicSources.get(topic);
85 DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret,
86 userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment,
87 aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
90 dmaapTopicSources.put(topic, dmaapTopicSource);
93 return dmaapTopicSource;
101 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
102 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
103 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
105 if (servers == null || servers.isEmpty()) {
106 throw new IllegalArgumentException("DMaaP Server(s) must be provided");
109 if (topic == null || topic.isEmpty()) {
110 throw new IllegalArgumentException(MISSING_TOPIC);
113 synchronized (this) {
114 if (dmaapTopicSources.containsKey(topic)) {
115 return dmaapTopicSources.get(topic);
118 DmaapTopicSource dmaapTopicSource =
119 new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password,
120 consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
123 dmaapTopicSources.put(topic, dmaapTopicSource);
126 return dmaapTopicSource;
134 public List<DmaapTopicSource> build(Properties properties) {
136 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
137 if (readTopics == null || readTopics.isEmpty()) {
138 logger.info("{}: no topic for DMaaP Source", this);
139 return new ArrayList<>();
141 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
143 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
144 synchronized (this) {
145 for (String topic : readTopicList) {
146 if (this.dmaapTopicSources.containsKey(topic)) {
147 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
151 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
152 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
154 List<String> serverList;
155 if (servers != null && !servers.isEmpty()) {
156 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
158 serverList = new ArrayList<>();
161 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
162 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
164 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
165 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
167 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
168 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
170 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
171 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
173 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
174 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
176 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
177 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
179 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
180 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
182 /* DME2 Properties */
184 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
185 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
187 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
188 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
190 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
191 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
193 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
194 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
196 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
197 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
199 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
200 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
202 String dme2EpReadTimeoutMs =
203 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
204 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
206 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
207 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
209 String dme2RoundtripTimeoutMs =
210 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
211 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
213 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
214 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
216 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
217 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
219 String dme2SessionStickinessRequired =
220 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
221 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
223 Map<String, String> dme2AdditionalProps = new HashMap<>();
225 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
226 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
228 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
229 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
231 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
232 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
234 if (dme2Version != null && !dme2Version.isEmpty()) {
235 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
237 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
238 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
240 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
241 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
243 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
244 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
248 if (servers == null || servers.isEmpty()) {
250 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
254 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
255 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
257 fetchTimeout = Integer.parseInt(fetchTimeoutString);
258 } catch (NumberFormatException nfe) {
259 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
264 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
265 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
266 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
267 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
269 fetchLimit = Integer.parseInt(fetchLimitString);
270 } catch (NumberFormatException nfe) {
271 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
276 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
277 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
278 boolean managed = true;
279 if (managedString != null && !managedString.isEmpty()) {
280 managed = Boolean.parseBoolean(managedString);
283 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
284 + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
286 // default is to use HTTP if no https property exists
287 boolean useHttps = false;
288 if (useHttpsString != null && !useHttpsString.isEmpty()) {
289 useHttps = Boolean.parseBoolean(useHttpsString);
292 String allowSelfSignedCertsString =
293 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
294 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
296 // default is to disallow self-signed certs
297 boolean allowSelfSignedCerts = false;
298 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
299 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
303 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
304 aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
305 dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
306 useHttps, allowSelfSignedCerts);
308 dmaapTopicSourceLst.add(uebTopicSource);
311 return dmaapTopicSourceLst;
317 * @throws IllegalArgumentException
320 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
321 return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
322 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
328 * @throws IllegalArgumentException
331 public DmaapTopicSource build(List<String> servers, String topic) {
332 return this.build(servers, topic, null, null);
339 public void destroy(String topic) {
341 if (topic == null || topic.isEmpty()) {
342 throw new IllegalArgumentException(MISSING_TOPIC);
345 DmaapTopicSource uebTopicSource;
347 synchronized (this) {
348 if (!dmaapTopicSources.containsKey(topic)) {
352 uebTopicSource = dmaapTopicSources.remove(topic);
355 uebTopicSource.shutdown();
362 public DmaapTopicSource get(String topic) {
364 if (topic == null || topic.isEmpty()) {
365 throw new IllegalArgumentException(MISSING_TOPIC);
368 synchronized (this) {
369 if (dmaapTopicSources.containsKey(topic)) {
370 return dmaapTopicSources.get(topic);
372 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
378 public synchronized List<DmaapTopicSource> inventory() {
379 return new ArrayList<>(this.dmaapTopicSources.values());
383 public void destroy() {
384 List<DmaapTopicSource> readers = this.inventory();
385 for (DmaapTopicSource reader : readers) {
389 synchronized (this) {
390 this.dmaapTopicSources.clear();
395 public String toString() {
396 StringBuilder builder = new StringBuilder();
397 builder.append("IndexedDmaapTopicSourceFactory []");
398 return builder.toString();