2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
28 import java.util.Properties;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * DMAAP Topic Source Factory
38 public interface DmaapTopicSourceFactory {
39 public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40 public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41 public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42 public final String DME2_VERSION_PROPERTY = "Version";
43 public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44 public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45 public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46 public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49 * Creates an DMAAP Topic Source based on properties files
51 * @param properties Properties containing initialization values
53 * @return an DMAAP Topic Source
54 * @throws IllegalArgumentException if invalid parameters are present
56 public List<DmaapTopicSource> build(Properties properties);
59 * Instantiates a new DMAAP Topic Source
61 * @param servers list of servers
62 * @param topic topic name
63 * @param apiKey API Key
64 * @param apiSecret API Secret
65 * @param userName user name
66 * @param password password
67 * @param consumerGroup Consumer Group
68 * @param consumerInstance Consumer Instance
69 * @param fetchTimeout Read Fetch Timeout
70 * @param fetchLimit Fetch Limit
71 * @param managed is this endpoind managed?
72 * @param useHttps does the connection use HTTPS?
73 * @param allowSelfSignedCerts does connection allow self-signed certificates?
75 * @return an DMAAP Topic Source
76 * @throws IllegalArgumentException if invalid parameters are present
78 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
79 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
80 boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
83 * Instantiates a new DMAAP Topic Source
85 * @param servers list of servers
86 * @param topic topic name
87 * @param apiKey API Key
88 * @param apiSecret API Secret
89 * @param userName user name
90 * @param password password
91 * @param consumerGroup Consumer Group
92 * @param consumerInstance Consumer Instance
93 * @param fetchTimeout Read Fetch Timeout
94 * @param fetchLimit Fetch Limit
95 * @param environment DME2 environment
96 * @param aftEnvironment DME2 AFT environment
97 * @param partner DME2 Partner
98 * @param latitude DME2 latitude
99 * @param longitude DME2 longitude
100 * @param additionalProps additional properties to pass to DME2
101 * @param managed is this endpoind managed?
102 * @param useHttps does the connection use HTTPS?
103 * @param allowSelfSignedCerts does connection allow self-signed certificates?
105 * @return an DMAAP Topic Source
106 * @throws IllegalArgumentException if invalid parameters are present
108 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
109 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
110 String environment, String aftEnvironment, String partner, String latitude, String longitude,
111 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
114 * Instantiates a new DMAAP Topic Source
116 * @param servers list of servers
117 * @param topic topic name
118 * @param apiKey API Key
119 * @param apiSecret API Secret
121 * @return an DMAAP Topic Source
122 * @throws IllegalArgumentException if invalid parameters are present
124 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
127 * Instantiates a new DMAAP Topic Source
129 * @param servers list of servers
130 * @param topic topic name
132 * @return an DMAAP Topic Source
133 * @throws IllegalArgumentException if invalid parameters are present
135 public DmaapTopicSource build(List<String> servers, String topic);
138 * Destroys an DMAAP Topic Source based on a topic
140 * @param topic topic name
141 * @throws IllegalArgumentException if invalid parameters are present
143 public void destroy(String topic);
146 * Destroys all DMAAP Topic Sources
148 public void destroy();
151 * gets an DMAAP Topic Source based on topic name
153 * @param topic the topic name
154 * @return an DMAAP Topic Source with topic name
155 * @throws IllegalArgumentException if an invalid topic is provided
156 * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
158 public DmaapTopicSource get(String topic);
161 * Provides a snapshot of the DMAAP Topic Sources
163 * @return a list of the DMAAP Topic Sources
165 public List<DmaapTopicSource> inventory();
169 /* ------------- implementation ----------------- */
172 * Factory of DMAAP Source Topics indexed by topic name
175 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
176 private static final String MISSING_TOPIC = "A topic must be provided";
181 private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
184 * DMaaP Topic Name Index
186 protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
192 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
193 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
194 String environment, String aftEnvironment, String partner, String latitude, String longitude,
195 Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
197 if (topic == null || topic.isEmpty()) {
198 throw new IllegalArgumentException(MISSING_TOPIC);
201 synchronized (this) {
202 if (dmaapTopicSources.containsKey(topic)) {
203 return dmaapTopicSources.get(topic);
206 DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret,
207 userName, password, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, environment,
208 aftEnvironment, partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
211 dmaapTopicSources.put(topic, dmaapTopicSource);
214 return dmaapTopicSource;
222 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
223 String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
224 boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
226 if (servers == null || servers.isEmpty()) {
227 throw new IllegalArgumentException("DMaaP Server(s) must be provided");
230 if (topic == null || topic.isEmpty()) {
231 throw new IllegalArgumentException(MISSING_TOPIC);
234 synchronized (this) {
235 if (dmaapTopicSources.containsKey(topic)) {
236 return dmaapTopicSources.get(topic);
239 DmaapTopicSource dmaapTopicSource =
240 new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password,
241 consumerGroup, consumerInstance, fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
244 dmaapTopicSources.put(topic, dmaapTopicSource);
247 return dmaapTopicSource;
255 public List<DmaapTopicSource> build(Properties properties) {
257 String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
258 if (readTopics == null || readTopics.isEmpty()) {
259 logger.info("{}: no topic for DMaaP Source", this);
260 return new ArrayList<>();
262 List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
264 List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
265 synchronized (this) {
266 for (String topic : readTopicList) {
267 if (this.dmaapTopicSources.containsKey(topic)) {
268 dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
272 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
273 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
275 List<String> serverList;
276 if (servers != null && !servers.isEmpty()) {
277 serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
279 serverList = new ArrayList<>();
282 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
283 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
285 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
286 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
288 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
289 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
291 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
292 + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
294 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
295 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
297 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
298 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
300 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
301 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
303 /* DME2 Properties */
305 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
306 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
308 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
309 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
311 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
312 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
314 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
315 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
317 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
318 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
320 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
321 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
323 String dme2EpReadTimeoutMs =
324 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
325 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
327 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
330 String dme2RoundtripTimeoutMs =
331 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
332 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
334 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
335 + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
337 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
338 + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
340 String dme2SessionStickinessRequired =
341 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
342 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
344 Map<String, String> dme2AdditionalProps = new HashMap<>();
346 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
347 dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
349 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
350 dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
352 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
353 dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
355 if (dme2Version != null && !dme2Version.isEmpty()) {
356 dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
358 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
359 dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
361 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
362 dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
364 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
365 dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
369 if (servers == null || servers.isEmpty()) {
371 logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
375 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
376 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
378 fetchTimeout = Integer.parseInt(fetchTimeoutString);
379 } catch (NumberFormatException nfe) {
380 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
385 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
386 + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
387 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
388 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
390 fetchLimit = Integer.parseInt(fetchLimitString);
391 } catch (NumberFormatException nfe) {
392 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
397 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
398 + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
399 boolean managed = true;
400 if (managedString != null && !managedString.isEmpty()) {
401 managed = Boolean.parseBoolean(managedString);
404 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
405 + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
407 // default is to use HTTP if no https property exists
408 boolean useHttps = false;
409 if (useHttpsString != null && !useHttpsString.isEmpty()) {
410 useHttps = Boolean.parseBoolean(useHttpsString);
413 String allowSelfSignedCertsString =
414 properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
415 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
417 // default is to disallow self-signed certs
418 boolean allowSelfSignedCerts = false;
419 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
420 allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
424 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
425 aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
426 dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
427 useHttps, allowSelfSignedCerts);
429 dmaapTopicSourceLst.add(uebTopicSource);
432 return dmaapTopicSourceLst;
438 * @throws IllegalArgumentException
441 public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
442 return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
443 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
449 * @throws IllegalArgumentException
452 public DmaapTopicSource build(List<String> servers, String topic) {
453 return this.build(servers, topic, null, null);
460 public void destroy(String topic) {
462 if (topic == null || topic.isEmpty()) {
463 throw new IllegalArgumentException(MISSING_TOPIC);
466 DmaapTopicSource uebTopicSource;
468 synchronized (this) {
469 if (!dmaapTopicSources.containsKey(topic)) {
473 uebTopicSource = dmaapTopicSources.remove(topic);
476 uebTopicSource.shutdown();
483 public DmaapTopicSource get(String topic) {
485 if (topic == null || topic.isEmpty()) {
486 throw new IllegalArgumentException(MISSING_TOPIC);
489 synchronized (this) {
490 if (dmaapTopicSources.containsKey(topic)) {
491 return dmaapTopicSources.get(topic);
493 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
499 public synchronized List<DmaapTopicSource> inventory() {
500 return new ArrayList<>(this.dmaapTopicSources.values());
504 public void destroy() {
505 List<DmaapTopicSource> readers = this.inventory();
506 for (DmaapTopicSource reader : readers) {
510 synchronized (this) {
511 this.dmaapTopicSources.clear();
516 public String toString() {
517 StringBuilder builder = new StringBuilder();
518 builder.append("IndexedDmaapTopicSourceFactory []");
519 return builder.toString();