2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * DMAAP Topic Source Factory
40 public interface DmaapTopicSourceFactory {
41 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44 public final String DME2_VERSION_PROPERTY = "Version";
45 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
51 * Creates an DMAAP Topic Source based on properties files
53 * @param properties Properties containing initialization values
55 * @return an DMAAP Topic Source
56 * @throws IllegalArgumentException if invalid parameters are present
58 public List<DmaapTopicSource> build(Properties properties);
61 * Instantiates a new DMAAP Topic Source
63 * @param servers list of servers
64 * @param topic topic name
65 * @param apiKey API Key
66 * @param apiSecret API Secret
67 * @param userName user name
68 * @param password password
69 * @param consumerGroup Consumer Group
70 * @param consumerInstance Consumer Instance
71 * @param fetchTimeout Read Fetch Timeout
72 * @param fetchLimit Fetch Limit
73 * @param managed is this endpoind managed?
74 * @param useHttps does the connection use HTTPS?
75 * @param allowSelfSignedCerts does connection allow self-signed certificates?
77 * @return an DMAAP Topic Source
78 * @throws IllegalArgumentException if invalid parameters are present
80 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
81 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
82 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
85 * Instantiates a new DMAAP Topic Source
87 * @param servers list of servers
88 * @param topic topic name
89 * @param apiKey API Key
90 * @param apiSecret API Secret
91 * @param userName user name
92 * @param password password
93 * @param consumerGroup Consumer Group
94 * @param consumerInstance Consumer Instance
95 * @param fetchTimeout Read Fetch Timeout
96 * @param fetchLimit Fetch Limit
97 * @param environment DME2 environment
98 * @param aftEnvironment DME2 AFT environment
99 * @param partner DME2 Partner
100 * @param latitude DME2 latitude
101 * @param longitude DME2 longitude
102 * @param additionalProps additional properties to pass to DME2
103 * @param managed is this endpoind managed?
104 * @param useHttps does the connection use HTTPS?
105 * @param allowSelfSignedCerts does connection allow self-signed certificates?
107 * @return an DMAAP Topic Source
108 * @throws IllegalArgumentException if invalid parameters are present
110 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
111 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
112 String environment, String aftEnvironment, String partner, String latitude, String longitude,
113 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
116 * Instantiates a new DMAAP Topic Source
118 * @param servers list of servers
119 * @param topic topic name
120 * @param apiKey API Key
121 * @param apiSecret API Secret
123 * @return an DMAAP Topic Source
124 * @throws IllegalArgumentException if invalid parameters are present
126 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
129 * Instantiates a new DMAAP Topic Source
131 * @param servers list of servers
132 * @param topic topic name
134 * @return an DMAAP Topic Source
135 * @throws IllegalArgumentException if invalid parameters are present
137 public DmaapTopicSource build(List<String> servers, String topic);
140 * Destroys an DMAAP Topic Source based on a topic
142 * @param topic topic name
143 * @throws IllegalArgumentException if invalid parameters are present
145 public void destroy(String topic);
148 * Destroys all DMAAP Topic Sources
150 public void destroy();
153 * gets an DMAAP Topic Source based on topic name
155 * @param topic the topic name
156 * @return an DMAAP Topic Source with topic name
157 * @throws IllegalArgumentException if an invalid topic is provided
158 * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
160 public DmaapTopicSource get(String topic);
163 * Provides a snapshot of the DMAAP Topic Sources
165 * @return a list of the DMAAP Topic Sources
167 public List<DmaapTopicSource> inventory();
171 /* ------------- implementation ----------------- */
174 * Factory of DMAAP Source Topics indexed by topic name
177 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
178 private static final String MISSING_TOPIC = "A topic must be provided";
183 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
186 * DMaaP Topic Name Index
188 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
194 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
195 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
196 String environment, String aftEnvironment, String partner, String latitude, String longitude,
197 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
199 if (topic == null || topic.isEmpty()) {
200 throw new IllegalArgumentException(MISSING_TOPIC);
203 synchronized (this) {
204 if (dmaapTopicSources.containsKey(topic)) {
205 return dmaapTopicSources.get(topic);
208 DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
212 .apiSecret(apiSecret)
215 .consumerGroup(consumerGroup)
216 .consumerInstance(consumerInstance)
217 .fetchTimeout(fetchTimeout)
218 .fetchLimit(fetchLimit)
219 .environment(environment)
220 .aftEnvironment(aftEnvironment)
223 .longitude(longitude)
224 .additionalProps(additionalProps)
226 .allowSelfSignedCerts(allowSelfSignedCerts)
230 dmaapTopicSources.put(topic, dmaapTopicSource);
233 return dmaapTopicSource;
241 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
242 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
243 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
245 if (servers == null || servers.isEmpty()) {
246 throw new IllegalArgumentException("DMaaP Server(s) must be provided");
249 if (topic == null || topic.isEmpty()) {
250 throw new IllegalArgumentException(MISSING_TOPIC);
253 synchronized (this) {
254 if (dmaapTopicSources.containsKey(topic)) {
255 return dmaapTopicSources.get(topic);
258 DmaapTopicSource dmaapTopicSource =
259 new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
263 .apiSecret(apiSecret)
266 .consumerGroup(consumerGroup)
267 .consumerInstance(consumerInstance)
268 .fetchTimeout(fetchTimeout)
269 .fetchLimit(fetchLimit)
271 .allowSelfSignedCerts(allowSelfSignedCerts)
275 dmaapTopicSources.put(topic, dmaapTopicSource);
278 return dmaapTopicSource;
286 public List<DmaapTopicSource> build(Properties properties) {
288 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
289 if (readTopics == null || readTopics.isEmpty()) {
290 logger.info("{}: no topic for DMaaP Source", this);
291 return new ArrayList<>();
293 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
295 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
296 synchronized (this) {
297 for (String topic : readTopicList) {
298 if (this.dmaapTopicSources.containsKey(topic)) {
299 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
303 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
304 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
306 List<String> serverList;
307 if (servers != null && !servers.isEmpty()) {
308 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
310 serverList = new ArrayList<>();
313 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
314 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
316 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
317 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
319 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
320 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
322 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
323 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
325 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
326 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
328 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
329 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
331 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
332 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
334 /* DME2 Properties */
336 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
337 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
339 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
340 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
342 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
343 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
345 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
346 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
348 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
349 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
351 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
352 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
354 String dme2EpReadTimeoutMs =
355 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
356 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
358 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
359 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
361 String dme2RoundtripTimeoutMs =
362 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
363 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
365 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
366 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
368 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
369 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
371 String dme2SessionStickinessRequired =
372 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
373 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
375 Map<String, String> dme2AdditionalProps = new HashMap<>();
377 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
378 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
380 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
381 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
383 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
384 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
386 if (dme2Version != null && !dme2Version.isEmpty()) {
387 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
389 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
390 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
392 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
393 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
395 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
396 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
400 if (servers == null || servers.isEmpty()) {
402 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
406 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
407 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
409 fetchTimeout = Integer.parseInt(fetchTimeoutString);
410 } catch (NumberFormatException nfe) {
411 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
416 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
417 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
418 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
419 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
421 fetchLimit = Integer.parseInt(fetchLimitString);
422 } catch (NumberFormatException nfe) {
423 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
428 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
429 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
430 boolean managed = true;
431 if (managedString != null && !managedString.isEmpty()) {
432 managed = Boolean.parseBoolean(managedString);
435 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
436 + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
438 // default is to use HTTP if no https property exists
439 boolean useHttps = false;
440 if (useHttpsString != null && !useHttpsString.isEmpty()) {
441 useHttps = Boolean.parseBoolean(useHttpsString);
444 String allowSelfSignedCertsString =
445 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
446 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
448 // default is to disallow self-signed certs
449 boolean allowSelfSignedCerts = false;
450 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
451 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
455 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
456 aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
457 dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
458 useHttps, allowSelfSignedCerts);
460 dmaapTopicSourceLst.add(uebTopicSource);
463 return dmaapTopicSourceLst;
469 * @throws IllegalArgumentException
472 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
473 return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
474 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
480 * @throws IllegalArgumentException
483 public DmaapTopicSource build(List<String> servers, String topic) {
484 return this.build(servers, topic, null, null);
491 public void destroy(String topic) {
493 if (topic == null || topic.isEmpty()) {
494 throw new IllegalArgumentException(MISSING_TOPIC);
497 DmaapTopicSource uebTopicSource;
499 synchronized (this) {
500 if (!dmaapTopicSources.containsKey(topic)) {
504 uebTopicSource = dmaapTopicSources.remove(topic);
507 uebTopicSource.shutdown();
514 public DmaapTopicSource get(String topic) {
516 if (topic == null || topic.isEmpty()) {
517 throw new IllegalArgumentException(MISSING_TOPIC);
520 synchronized (this) {
521 if (dmaapTopicSources.containsKey(topic)) {
522 return dmaapTopicSources.get(topic);
524 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
530 public synchronized List<DmaapTopicSource> inventory() {
531 return new ArrayList<>(this.dmaapTopicSources.values());
535 public void destroy() {
536 List<DmaapTopicSource> readers = this.inventory();
537 for (DmaapTopicSource reader : readers) {
541 synchronized (this) {
542 this.dmaapTopicSources.clear();
547 public String toString() {
548 StringBuilder builder = new StringBuilder();
549 builder.append("IndexedDmaapTopicSourceFactory []");
550 return builder.toString();