4285b3a9af0cbd0adef7ab68433fa3a2ab325211
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedBusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * DMAAP Topic Source Factory
38  */
39 public interface DmaapTopicSourceFactory {
40     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
41     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
42     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
43     public final String DME2_VERSION_PROPERTY = "Version";
44     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
45     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
46     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
47     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
48
49     /**
50      * Creates an DMAAP Topic Source based on properties files
51      * 
52      * @param properties Properties containing initialization values
53      * 
54      * @return an DMAAP Topic Source
55      * @throws IllegalArgumentException if invalid parameters are present
56      */
57     public List<DmaapTopicSource> build(Properties properties);
58
59     /**
60      * Instantiates a new DMAAP Topic Source
61      * 
62      * @param servers list of servers
63      * @param topic topic name
64      * @param apiKey API Key
65      * @param apiSecret API Secret
66      * @param userName user name
67      * @param password password
68      * @param consumerGroup Consumer Group
69      * @param consumerInstance Consumer Instance
70      * @param fetchTimeout Read Fetch Timeout
71      * @param fetchLimit Fetch Limit
72      * @param managed is this endpoind managed?
73      * @param useHttps does the connection use HTTPS?
74      * @param allowSelfSignedCerts does connection allow self-signed certificates?
75      * 
76      * @return an DMAAP Topic Source
77      * @throws IllegalArgumentException if invalid parameters are present
78      */
79     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
80                                   String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
81                                   boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
82
83     /**
84      * Instantiates a new DMAAP Topic Source
85      *
86      * @param servers list of servers
87      * @param topic topic name
88      * @param apiKey API Key
89      * @param apiSecret API Secret
90      * @param userName user name
91      * @param password password
92      * @param consumerGroup Consumer Group
93      * @param consumerInstance Consumer Instance
94      * @param fetchTimeout Read Fetch Timeout
95      * @param fetchLimit Fetch Limit
96      * @param environment DME2 environment
97      * @param aftEnvironment DME2 AFT environment
98      * @param partner DME2 Partner
99      * @param latitude DME2 latitude
100      * @param longitude DME2 longitude
101      * @param additionalProps additional properties to pass to DME2
102      * @param managed is this endpoind managed?
103      * @param useHttps does the connection use HTTPS?
104      * @param allowSelfSignedCerts does connection allow self-signed certificates?
105      *
106      * @return an DMAAP Topic Source
107      * @throws IllegalArgumentException if invalid parameters are present
108      */
109     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
110                                   String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
111                                   String environment, String aftEnvironment, String partner, String latitude, String longitude,
112                                   Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
113
114     /**
115      * Instantiates a new DMAAP Topic Source
116      * 
117      * @param servers list of servers
118      * @param topic topic name
119      * @param apiKey API Key
120      * @param apiSecret API Secret
121      * 
122      * @return an DMAAP Topic Source
123      * @throws IllegalArgumentException if invalid parameters are present
124      */
125     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
126
127     /**
128      * Instantiates a new DMAAP Topic Source
129      * 
130      * @param servers list of servers
131      * @param topic topic name
132      * 
133      * @return an DMAAP Topic Source
134      * @throws IllegalArgumentException if invalid parameters are present
135      */
136     public DmaapTopicSource build(List<String> servers, String topic);
137
138     /**
139      * Destroys an DMAAP Topic Source based on a topic
140      * 
141      * @param topic topic name
142      * @throws IllegalArgumentException if invalid parameters are present
143      */
144     public void destroy(String topic);
145
146     /**
147      * Destroys all DMAAP Topic Sources
148      */
149     public void destroy();
150
151     /**
152      * gets an DMAAP Topic Source based on topic name
153      * 
154      * @param topic the topic name
155      * @return an DMAAP Topic Source with topic name
156      * @throws IllegalArgumentException if an invalid topic is provided
157      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
158      */
159     public DmaapTopicSource get(String topic);
160
161     /**
162      * Provides a snapshot of the DMAAP Topic Sources
163      * 
164      * @return a list of the DMAAP Topic Sources
165      */
166     public List<DmaapTopicSource> inventory();
167 }
168
169
170 /* ------------- implementation ----------------- */
171
172 /**
173  * Factory of DMAAP Source Topics indexed by topic name
174  */
175
176 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
177     private static final String MISSING_TOPIC = "A topic must be provided";
178
179     /**
180      * Logger
181      */
182     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
183
184     /**
185      * DMaaP Topic Name Index
186      */
187     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
188
189     /**
190      * {@inheritDoc}
191      */
192     @Override
193     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
194             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
195             String environment, String aftEnvironment, String partner, String latitude, String longitude,
196             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
197
198         if (topic == null || topic.isEmpty()) {
199             throw new IllegalArgumentException(MISSING_TOPIC);
200         }
201
202         synchronized (this) {
203             if (dmaapTopicSources.containsKey(topic)) {
204                 return dmaapTopicSources.get(topic);
205             }
206
207             DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
208                     .servers(servers)
209                     .topic(topic)
210                     .apiKey(apiKey)
211                     .apiSecret(apiSecret)
212                     .userName(userName)
213                     .password(password)
214                     .consumerGroup(consumerGroup)
215                     .consumerInstance(consumerInstance)
216                     .fetchTimeout(fetchTimeout)
217                     .fetchLimit(fetchLimit)
218                     .environment(environment)
219                     .aftEnvironment(aftEnvironment)
220                     .partner(partner)
221                     .latitude(latitude)
222                     .longitude(longitude)
223                     .additionalProps(additionalProps)
224                     .useHttps(useHttps)
225                     .allowSelfSignedCerts(allowSelfSignedCerts)
226                     .build());
227
228             if (managed) {
229                 dmaapTopicSources.put(topic, dmaapTopicSource);
230             }
231
232             return dmaapTopicSource;
233         }
234     }
235
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
241             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
242             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
243
244         if (servers == null || servers.isEmpty()) {
245             throw new IllegalArgumentException("DMaaP Server(s) must be provided");
246         }
247
248         if (topic == null || topic.isEmpty()) {
249             throw new IllegalArgumentException(MISSING_TOPIC);
250         }
251
252         synchronized (this) {
253             if (dmaapTopicSources.containsKey(topic)) {
254                 return dmaapTopicSources.get(topic);
255             }
256
257             DmaapTopicSource dmaapTopicSource =
258                     new SingleThreadedDmaapTopicSource(SingleThreadedBusTopicSource.BusTopicParams.builder()
259                             .servers(servers)
260                             .topic(topic)
261                             .apiKey(apiKey)
262                             .apiSecret(apiSecret)
263                             .userName(userName)
264                             .password(password)
265                             .consumerGroup(consumerGroup)
266                             .consumerInstance(consumerInstance)
267                             .fetchTimeout(fetchTimeout)
268                             .fetchLimit(fetchLimit)
269                             .useHttps(useHttps)
270                             .allowSelfSignedCerts(allowSelfSignedCerts)
271                             .build());
272
273             if (managed) {
274                 dmaapTopicSources.put(topic, dmaapTopicSource);
275             }
276
277             return dmaapTopicSource;
278         }
279     }
280
281     /**
282      * {@inheritDoc}
283      */
284     @Override
285     public List<DmaapTopicSource> build(Properties properties) {
286
287         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
288         if (readTopics == null || readTopics.isEmpty()) {
289             logger.info("{}: no topic for DMaaP Source", this);
290             return new ArrayList<>();
291         }
292         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
293
294         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
295         synchronized (this) {
296             for (String topic : readTopicList) {
297                 if (this.dmaapTopicSources.containsKey(topic)) {
298                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
299                     continue;
300                 }
301
302                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
303                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
304
305                 List<String> serverList;
306                 if (servers != null && !servers.isEmpty()) {
307                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
308                 } else {
309                     serverList = new ArrayList<>();
310                 }
311
312                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
313                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
314
315                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
316                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
317
318                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
319                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
320
321                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
322                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
323
324                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
325                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
326
327                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
329
330                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
331                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
332
333                 /* DME2 Properties */
334
335                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
336                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
337
338                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
339                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
340
341                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
342                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
343
344                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
345                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
346
347                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
348                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
349
350                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
351                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
352
353                 String dme2EpReadTimeoutMs =
354                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
355                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
356
357                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
358                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
359
360                 String dme2RoundtripTimeoutMs =
361                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
362                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
363
364                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
365                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
366
367                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
368                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
369
370                 String dme2SessionStickinessRequired =
371                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
372                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
373
374                 Map<String, String> dme2AdditionalProps = new HashMap<>();
375
376                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
377                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
378                 }
379                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
380                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
381                 }
382                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
383                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
384                 }
385                 if (dme2Version != null && !dme2Version.isEmpty()) {
386                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
387                 }
388                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
389                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
390                 }
391                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
392                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
393                 }
394                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
395                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
396                 }
397
398
399                 if (servers == null || servers.isEmpty()) {
400
401                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
402                     continue;
403                 }
404
405                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
406                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
407                     try {
408                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
409                     } catch (NumberFormatException nfe) {
410                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
411                                 topic);
412                     }
413                 }
414
415                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
416                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
417                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
418                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
419                     try {
420                         fetchLimit = Integer.parseInt(fetchLimitString);
421                     } catch (NumberFormatException nfe) {
422                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
423                                 topic);
424                     }
425                 }
426
427                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
428                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
429                 boolean managed = true;
430                 if (managedString != null && !managedString.isEmpty()) {
431                     managed = Boolean.parseBoolean(managedString);
432                 }
433
434                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
435                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
436
437                 // default is to use HTTP if no https property exists
438                 boolean useHttps = false;
439                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
440                     useHttps = Boolean.parseBoolean(useHttpsString);
441                 }
442
443                 String allowSelfSignedCertsString =
444                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
445                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
446
447                 // default is to disallow self-signed certs
448                 boolean allowSelfSignedCerts = false;
449                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
450                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
451                 }
452
453
454                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
455                         aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
456                         dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
457                         useHttps, allowSelfSignedCerts);
458
459                 dmaapTopicSourceLst.add(uebTopicSource);
460             }
461         }
462         return dmaapTopicSourceLst;
463     }
464
465     /**
466      * {@inheritDoc}
467      * 
468      * @throws IllegalArgumentException
469      */
470     @Override
471     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
472         return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
473                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
474     }
475
476     /**
477      * {@inheritDoc}
478      * 
479      * @throws IllegalArgumentException
480      */
481     @Override
482     public DmaapTopicSource build(List<String> servers, String topic) {
483         return this.build(servers, topic, null, null);
484     }
485
486     /**
487      * {@inheritDoc}
488      */
489     @Override
490     public void destroy(String topic) {
491
492         if (topic == null || topic.isEmpty()) {
493             throw new IllegalArgumentException(MISSING_TOPIC);
494         }
495
496         DmaapTopicSource uebTopicSource;
497
498         synchronized (this) {
499             if (!dmaapTopicSources.containsKey(topic)) {
500                 return;
501             }
502
503             uebTopicSource = dmaapTopicSources.remove(topic);
504         }
505
506         uebTopicSource.shutdown();
507     }
508
509     /**
510      * {@inheritDoc}
511      */
512     @Override
513     public DmaapTopicSource get(String topic) {
514
515         if (topic == null || topic.isEmpty()) {
516             throw new IllegalArgumentException(MISSING_TOPIC);
517         }
518
519         synchronized (this) {
520             if (dmaapTopicSources.containsKey(topic)) {
521                 return dmaapTopicSources.get(topic);
522             } else {
523                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
524             }
525         }
526     }
527
528     @Override
529     public synchronized List<DmaapTopicSource> inventory() {
530         return new ArrayList<>(this.dmaapTopicSources.values());
531     }
532
533     @Override
534     public void destroy() {
535         List<DmaapTopicSource> readers = this.inventory();
536         for (DmaapTopicSource reader : readers) {
537             reader.shutdown();
538         }
539
540         synchronized (this) {
541             this.dmaapTopicSources.clear();
542         }
543     }
544
545     @Override
546     public String toString() {
547         StringBuilder builder = new StringBuilder();
548         builder.append("IndexedDmaapTopicSourceFactory []");
549         return builder.toString();
550     }
551
552 }
553