2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * DMAAP Topic Source Factory.
40 public interface DmaapTopicSourceFactory {
41 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44 public final String DME2_VERSION_PROPERTY = "Version";
45 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
51 * Creates an DMAAP Topic Source based on properties files.
53 * @param properties Properties containing initialization values
55 * @return an DMAAP Topic Source
56 * @throws IllegalArgumentException if invalid parameters are present
58 public List<DmaapTopicSource> build(Properties properties);
61 * Instantiates a new DMAAP Topic Source.
63 * @param servers list of servers
64 * @param topic topic name
65 * @param apiKey API Key
66 * @param apiSecret API Secret
67 * @param userName user name
68 * @param password password
69 * @param consumerGroup Consumer Group
70 * @param consumerInstance Consumer Instance
71 * @param fetchTimeout Read Fetch Timeout
72 * @param fetchLimit Fetch Limit
73 * @param managed is this endpoind managed?
74 * @param useHttps does the connection use HTTPS?
75 * @param allowSelfSignedCerts does connection allow self-signed certificates?
77 * @return an DMAAP Topic Source
78 * @throws IllegalArgumentException if invalid parameters are present
80 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
81 String password, String consumerGroup, String consumerInstance,
82 int fetchTimeout, int fetchLimit,
83 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
86 * Instantiates a new DMAAP Topic Source.
88 * @param servers list of servers
89 * @param topic topic name
90 * @param apiKey API Key
91 * @param apiSecret API Secret
92 * @param userName user name
93 * @param password password
94 * @param consumerGroup Consumer Group
95 * @param consumerInstance Consumer Instance
96 * @param fetchTimeout Read Fetch Timeout
97 * @param fetchLimit Fetch Limit
98 * @param environment DME2 environment
99 * @param aftEnvironment DME2 AFT environment
100 * @param partner DME2 Partner
101 * @param latitude DME2 latitude
102 * @param longitude DME2 longitude
103 * @param additionalProps additional properties to pass to DME2
104 * @param managed is this endpoind managed?
105 * @param useHttps does the connection use HTTPS?
106 * @param allowSelfSignedCerts does connection allow self-signed certificates?
108 * @return an DMAAP Topic Source
109 * @throws IllegalArgumentException if invalid parameters are present
111 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
112 String password, String consumerGroup, String consumerInstance, int fetchTimeout,
113 int fetchLimit, String environment, String aftEnvironment, String partner,
114 String latitude, String longitude, Map<String, String> additionalProps,
115 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
118 * Instantiates a new DMAAP Topic Source.
120 * @param servers list of servers
121 * @param topic topic name
122 * @param apiKey API Key
123 * @param apiSecret API Secret
125 * @return an DMAAP Topic Source
126 * @throws IllegalArgumentException if invalid parameters are present
128 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
131 * Instantiates a new DMAAP Topic Source.
133 * @param servers list of servers
134 * @param topic topic name
136 * @return an DMAAP Topic Source
137 * @throws IllegalArgumentException if invalid parameters are present
139 public DmaapTopicSource build(List<String> servers, String topic);
142 * Destroys an DMAAP Topic Source based on a topic.
144 * @param topic topic name
145 * @throws IllegalArgumentException if invalid parameters are present
147 public void destroy(String topic);
150 * Destroys all DMAAP Topic Sources.
152 public void destroy();
155 * Gets an DMAAP Topic Source based on topic name.
157 * @param topic the topic name
158 * @return an DMAAP Topic Source with topic name
159 * @throws IllegalArgumentException if an invalid topic is provided
160 * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
162 public DmaapTopicSource get(String topic);
165 * Provides a snapshot of the DMAAP Topic Sources.
167 * @return a list of the DMAAP Topic Sources
169 public List<DmaapTopicSource> inventory();
173 /* ------------- implementation ----------------- */
176 * Factory of DMAAP Source Topics indexed by topic name.
179 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
180 private static final String MISSING_TOPIC = "A topic must be provided";
185 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
188 * DMaaP Topic Name Index.
190 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
196 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
197 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
198 String environment, String aftEnvironment, String partner, String latitude, String longitude,
199 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
201 if (topic == null || topic.isEmpty()) {
202 throw new IllegalArgumentException(MISSING_TOPIC);
205 synchronized (this) {
206 if (dmaapTopicSources.containsKey(topic)) {
207 return dmaapTopicSources.get(topic);
210 DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
214 .apiSecret(apiSecret)
217 .consumerGroup(consumerGroup)
218 .consumerInstance(consumerInstance)
219 .fetchTimeout(fetchTimeout)
220 .fetchLimit(fetchLimit)
221 .environment(environment)
222 .aftEnvironment(aftEnvironment)
225 .longitude(longitude)
226 .additionalProps(additionalProps)
228 .allowSelfSignedCerts(allowSelfSignedCerts)
232 dmaapTopicSources.put(topic, dmaapTopicSource);
235 return dmaapTopicSource;
243 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
244 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
245 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
247 if (servers == null || servers.isEmpty()) {
248 throw new IllegalArgumentException("DMaaP Server(s) must be provided");
251 if (topic == null || topic.isEmpty()) {
252 throw new IllegalArgumentException(MISSING_TOPIC);
255 synchronized (this) {
256 if (dmaapTopicSources.containsKey(topic)) {
257 return dmaapTopicSources.get(topic);
260 DmaapTopicSource dmaapTopicSource =
261 new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
265 .apiSecret(apiSecret)
268 .consumerGroup(consumerGroup)
269 .consumerInstance(consumerInstance)
270 .fetchTimeout(fetchTimeout)
271 .fetchLimit(fetchLimit)
273 .allowSelfSignedCerts(allowSelfSignedCerts)
277 dmaapTopicSources.put(topic, dmaapTopicSource);
280 return dmaapTopicSource;
288 public List<DmaapTopicSource> build(Properties properties) {
290 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
291 if (readTopics == null || readTopics.isEmpty()) {
292 logger.info("{}: no topic for DMaaP Source", this);
293 return new ArrayList<>();
295 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
297 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
298 synchronized (this) {
299 for (String topic : readTopicList) {
300 if (this.dmaapTopicSources.containsKey(topic)) {
301 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
305 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
306 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
308 List<String> serverList;
309 if (servers != null && !servers.isEmpty()) {
310 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
312 serverList = new ArrayList<>();
315 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
316 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
318 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
319 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
321 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
322 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
324 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
325 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
327 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
330 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
331 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
333 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
334 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
336 /* DME2 Properties */
338 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
339 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
341 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
342 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
344 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
345 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
347 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
348 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
350 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
351 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
353 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
354 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
356 String dme2EpReadTimeoutMs =
357 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
358 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
360 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
361 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
363 String dme2RoundtripTimeoutMs =
364 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
365 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
367 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
368 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
370 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
371 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
373 String dme2SessionStickinessRequired =
374 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
375 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
377 Map<String, String> dme2AdditionalProps = new HashMap<>();
379 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
380 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
382 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
383 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
385 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
386 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
388 if (dme2Version != null && !dme2Version.isEmpty()) {
389 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
391 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
392 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
394 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
395 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
397 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
398 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
402 if (servers == null || servers.isEmpty()) {
404 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
408 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
409 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
411 fetchTimeout = Integer.parseInt(fetchTimeoutString);
412 } catch (NumberFormatException nfe) {
413 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
418 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
419 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
420 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
421 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
423 fetchLimit = Integer.parseInt(fetchLimitString);
424 } catch (NumberFormatException nfe) {
425 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
430 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
431 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
432 boolean managed = true;
433 if (managedString != null && !managedString.isEmpty()) {
434 managed = Boolean.parseBoolean(managedString);
437 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
438 + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
440 // default is to use HTTP if no https property exists
441 boolean useHttps = false;
442 if (useHttpsString != null && !useHttpsString.isEmpty()) {
443 useHttps = Boolean.parseBoolean(useHttpsString);
446 String allowSelfSignedCertsString =
447 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
448 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
450 // default is to disallow self-signed certs
451 boolean allowSelfSignedCerts = false;
452 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
453 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
457 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
458 aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
459 dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
460 useHttps, allowSelfSignedCerts);
462 dmaapTopicSourceLst.add(uebTopicSource);
465 return dmaapTopicSourceLst;
471 * @throws IllegalArgumentException
474 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
475 return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
476 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
482 * @throws IllegalArgumentException
485 public DmaapTopicSource build(List<String> servers, String topic) {
486 return this.build(servers, topic, null, null);
493 public void destroy(String topic) {
495 if (topic == null || topic.isEmpty()) {
496 throw new IllegalArgumentException(MISSING_TOPIC);
499 DmaapTopicSource uebTopicSource;
501 synchronized (this) {
502 if (!dmaapTopicSources.containsKey(topic)) {
506 uebTopicSource = dmaapTopicSources.remove(topic);
509 uebTopicSource.shutdown();
513 public void destroy() {
514 List<DmaapTopicSource> readers = this.inventory();
515 for (DmaapTopicSource reader : readers) {
519 synchronized (this) {
520 this.dmaapTopicSources.clear();
528 public DmaapTopicSource get(String topic) {
530 if (topic == null || topic.isEmpty()) {
531 throw new IllegalArgumentException(MISSING_TOPIC);
534 synchronized (this) {
535 if (dmaapTopicSources.containsKey(topic)) {
536 return dmaapTopicSources.get(topic);
538 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
544 public synchronized List<DmaapTopicSource> inventory() {
545 return new ArrayList<>(this.dmaapTopicSources.values());
549 public String toString() {
550 StringBuilder builder = new StringBuilder();
551 builder.append("IndexedDmaapTopicSourceFactory []");
552 return builder.toString();