2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.Properties;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * DMAAP Topic Source Factory
39 public interface DmaapTopicSourceFactory {
40 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
41 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
42 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
43 public final String DME2_VERSION_PROPERTY = "Version";
44 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
45 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
46 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
47 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
50 * Creates an DMAAP Topic Source based on properties files
52 * @param properties Properties containing initialization values
54 * @return an DMAAP Topic Source
55 * @throws IllegalArgumentException if invalid parameters are present
57 public List<DmaapTopicSource> build(Properties properties);
60 * Instantiates a new DMAAP Topic Source
62 * @param servers list of servers
63 * @param topic topic name
64 * @param apiKey API Key
65 * @param apiSecret API Secret
66 * @param userName user name
67 * @param password password
68 * @param consumerGroup Consumer Group
69 * @param consumerInstance Consumer Instance
70 * @param fetchTimeout Read Fetch Timeout
71 * @param fetchLimit Fetch Limit
72 * @param managed is this endpoind managed?
73 * @param useHttps does the connection use HTTPS?
74 * @param allowSelfSignedCerts does connection allow self-signed certificates?
76 * @return an DMAAP Topic Source
77 * @throws IllegalArgumentException if invalid parameters are present
79 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
80 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
81 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
84 * Instantiates a new DMAAP Topic Source
86 * @param servers list of servers
87 * @param topic topic name
88 * @param apiKey API Key
89 * @param apiSecret API Secret
90 * @param userName user name
91 * @param password password
92 * @param consumerGroup Consumer Group
93 * @param consumerInstance Consumer Instance
94 * @param fetchTimeout Read Fetch Timeout
95 * @param fetchLimit Fetch Limit
96 * @param environment DME2 environment
97 * @param aftEnvironment DME2 AFT environment
98 * @param partner DME2 Partner
99 * @param latitude DME2 latitude
100 * @param longitude DME2 longitude
101 * @param additionalProps additional properties to pass to DME2
102 * @param managed is this endpoind managed?
103 * @param useHttps does the connection use HTTPS?
104 * @param allowSelfSignedCerts does connection allow self-signed certificates?
106 * @return an DMAAP Topic Source
107 * @throws IllegalArgumentException if invalid parameters are present
109 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
110 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
111 String environment, String aftEnvironment, String partner, String latitude, String longitude,
112 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
115 * Instantiates a new DMAAP Topic Source
117 * @param servers list of servers
118 * @param topic topic name
119 * @param apiKey API Key
120 * @param apiSecret API Secret
122 * @return an DMAAP Topic Source
123 * @throws IllegalArgumentException if invalid parameters are present
125 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
128 * Instantiates a new DMAAP Topic Source
130 * @param servers list of servers
131 * @param topic topic name
133 * @return an DMAAP Topic Source
134 * @throws IllegalArgumentException if invalid parameters are present
136 public DmaapTopicSource build(List<String> servers, String topic);
139 * Destroys an DMAAP Topic Source based on a topic
141 * @param topic topic name
142 * @throws IllegalArgumentException if invalid parameters are present
144 public void destroy(String topic);
147 * Destroys all DMAAP Topic Sources
149 public void destroy();
152 * gets an DMAAP Topic Source based on topic name
154 * @param topic the topic name
155 * @return an DMAAP Topic Source with topic name
156 * @throws IllegalArgumentException if an invalid topic is provided
157 * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
159 public DmaapTopicSource get(String topic);
162 * Provides a snapshot of the DMAAP Topic Sources
164 * @return a list of the DMAAP Topic Sources
166 public List<DmaapTopicSource> inventory();
170 /* ------------- implementation ----------------- */
173 * Factory of DMAAP Source Topics indexed by topic name
176 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
177 private static final String MISSING_TOPIC = "A topic must be provided";
182 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
185 * DMaaP Topic Name Index
187 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
193 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
194 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
195 String environment, String aftEnvironment, String partner, String latitude, String longitude,
196 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
198 if (topic == null || topic.isEmpty()) {
199 throw new IllegalArgumentException(MISSING_TOPIC);
202 synchronized (this) {
203 if (dmaapTopicSources.containsKey(topic)) {
204 return dmaapTopicSources.get(topic);
207 DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
211 .apiSecret(apiSecret)
214 .consumerGroup(consumerGroup)
215 .consumerInstance(consumerInstance)
216 .fetchTimeout(fetchTimeout)
217 .fetchLimit(fetchLimit)
218 .environment(environment)
219 .aftEnvironment(aftEnvironment)
222 .longitude(longitude)
223 .additionalProps(additionalProps)
225 .allowSelfSignedCerts(allowSelfSignedCerts)
229 dmaapTopicSources.put(topic, dmaapTopicSource);
232 return dmaapTopicSource;
240 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
241 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
242 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
244 if (servers == null || servers.isEmpty()) {
245 throw new IllegalArgumentException("DMaaP Server(s) must be provided");
248 if (topic == null || topic.isEmpty()) {
249 throw new IllegalArgumentException(MISSING_TOPIC);
252 synchronized (this) {
253 if (dmaapTopicSources.containsKey(topic)) {
254 return dmaapTopicSources.get(topic);
257 DmaapTopicSource dmaapTopicSource =
258 new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
262 .apiSecret(apiSecret)
265 .consumerGroup(consumerGroup)
266 .consumerInstance(consumerInstance)
267 .fetchTimeout(fetchTimeout)
268 .fetchLimit(fetchLimit)
270 .allowSelfSignedCerts(allowSelfSignedCerts)
274 dmaapTopicSources.put(topic, dmaapTopicSource);
277 return dmaapTopicSource;
285 public List<DmaapTopicSource> build(Properties properties) {
287 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
288 if (readTopics == null || readTopics.isEmpty()) {
289 logger.info("{}: no topic for DMaaP Source", this);
290 return new ArrayList<>();
292 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
294 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
295 synchronized (this) {
296 for (String topic : readTopicList) {
297 if (this.dmaapTopicSources.containsKey(topic)) {
298 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
302 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
303 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
305 List<String> serverList;
306 if (servers != null && !servers.isEmpty()) {
307 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
309 serverList = new ArrayList<>();
312 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
313 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
315 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
316 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
318 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
319 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
321 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
322 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
324 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
325 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
327 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
330 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
331 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
333 /* DME2 Properties */
335 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
336 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
338 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
339 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
341 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
342 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
344 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
345 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
347 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
348 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
350 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
351 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
353 String dme2EpReadTimeoutMs =
354 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
355 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
357 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
358 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
360 String dme2RoundtripTimeoutMs =
361 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
362 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
364 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
365 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
367 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
368 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
370 String dme2SessionStickinessRequired =
371 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
372 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
374 Map<String, String> dme2AdditionalProps = new HashMap<>();
376 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
377 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
379 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
380 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
382 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
383 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
385 if (dme2Version != null && !dme2Version.isEmpty()) {
386 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
388 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
389 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
391 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
392 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
394 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
395 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
399 if (servers == null || servers.isEmpty()) {
401 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
405 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
406 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
408 fetchTimeout = Integer.parseInt(fetchTimeoutString);
409 } catch (NumberFormatException nfe) {
410 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
415 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
416 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
417 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
418 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
420 fetchLimit = Integer.parseInt(fetchLimitString);
421 } catch (NumberFormatException nfe) {
422 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
427 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
428 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
429 boolean managed = true;
430 if (managedString != null && !managedString.isEmpty()) {
431 managed = Boolean.parseBoolean(managedString);
434 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
435 + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
437 // default is to use HTTP if no https property exists
438 boolean useHttps = false;
439 if (useHttpsString != null && !useHttpsString.isEmpty()) {
440 useHttps = Boolean.parseBoolean(useHttpsString);
443 String allowSelfSignedCertsString =
444 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
445 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
447 // default is to disallow self-signed certs
448 boolean allowSelfSignedCerts = false;
449 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
450 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
454 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
455 aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
456 dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
457 useHttps, allowSelfSignedCerts);
459 dmaapTopicSourceLst.add(uebTopicSource);
462 return dmaapTopicSourceLst;
468 * @throws IllegalArgumentException
471 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
472 return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
473 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
479 * @throws IllegalArgumentException
482 public DmaapTopicSource build(List<String> servers, String topic) {
483 return this.build(servers, topic, null, null);
490 public void destroy(String topic) {
492 if (topic == null || topic.isEmpty()) {
493 throw new IllegalArgumentException(MISSING_TOPIC);
496 DmaapTopicSource uebTopicSource;
498 synchronized (this) {
499 if (!dmaapTopicSources.containsKey(topic)) {
503 uebTopicSource = dmaapTopicSources.remove(topic);
506 uebTopicSource.shutdown();
513 public DmaapTopicSource get(String topic) {
515 if (topic == null || topic.isEmpty()) {
516 throw new IllegalArgumentException(MISSING_TOPIC);
519 synchronized (this) {
520 if (dmaapTopicSources.containsKey(topic)) {
521 return dmaapTopicSources.get(topic);
523 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
529 public synchronized List<DmaapTopicSource> inventory() {
530 return new ArrayList<>(this.dmaapTopicSources.values());
534 public void destroy() {
535 List<DmaapTopicSource> readers = this.inventory();
536 for (DmaapTopicSource reader : readers) {
540 synchronized (this) {
541 this.dmaapTopicSources.clear();
546 public String toString() {
547 StringBuilder builder = new StringBuilder();
548 builder.append("IndexedDmaapTopicSourceFactory []");
549 return builder.toString();