11dfd292cf17640c6e55b5c06215775de04b2817
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory
39  */
40 public interface DmaapTopicSourceFactory {
41     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     public final String DME2_VERSION_PROPERTY = "Version";
45     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     public List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source
62      * 
63      * @param servers list of servers
64      * @param topic topic name
65      * @param apiKey API Key
66      * @param apiSecret API Secret
67      * @param userName user name
68      * @param password password
69      * @param consumerGroup Consumer Group
70      * @param consumerInstance Consumer Instance
71      * @param fetchTimeout Read Fetch Timeout
72      * @param fetchLimit Fetch Limit
73      * @param managed is this endpoind managed?
74      * @param useHttps does the connection use HTTPS?
75      * @param allowSelfSignedCerts does connection allow self-signed certificates?
76      * 
77      * @return an DMAAP Topic Source
78      * @throws IllegalArgumentException if invalid parameters are present
79      */
80     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
81                                   String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
82                                   boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
83
84     /**
85      * Instantiates a new DMAAP Topic Source
86      *
87      * @param servers list of servers
88      * @param topic topic name
89      * @param apiKey API Key
90      * @param apiSecret API Secret
91      * @param userName user name
92      * @param password password
93      * @param consumerGroup Consumer Group
94      * @param consumerInstance Consumer Instance
95      * @param fetchTimeout Read Fetch Timeout
96      * @param fetchLimit Fetch Limit
97      * @param environment DME2 environment
98      * @param aftEnvironment DME2 AFT environment
99      * @param partner DME2 Partner
100      * @param latitude DME2 latitude
101      * @param longitude DME2 longitude
102      * @param additionalProps additional properties to pass to DME2
103      * @param managed is this endpoind managed?
104      * @param useHttps does the connection use HTTPS?
105      * @param allowSelfSignedCerts does connection allow self-signed certificates?
106      *
107      * @return an DMAAP Topic Source
108      * @throws IllegalArgumentException if invalid parameters are present
109      */
110     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
111                                   String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
112                                   String environment, String aftEnvironment, String partner, String latitude, String longitude,
113                                   Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
114
115     /**
116      * Instantiates a new DMAAP Topic Source
117      * 
118      * @param servers list of servers
119      * @param topic topic name
120      * @param apiKey API Key
121      * @param apiSecret API Secret
122      * 
123      * @return an DMAAP Topic Source
124      * @throws IllegalArgumentException if invalid parameters are present
125      */
126     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
127
128     /**
129      * Instantiates a new DMAAP Topic Source
130      * 
131      * @param servers list of servers
132      * @param topic topic name
133      * 
134      * @return an DMAAP Topic Source
135      * @throws IllegalArgumentException if invalid parameters are present
136      */
137     public DmaapTopicSource build(List<String> servers, String topic);
138
139     /**
140      * Destroys an DMAAP Topic Source based on a topic
141      * 
142      * @param topic topic name
143      * @throws IllegalArgumentException if invalid parameters are present
144      */
145     public void destroy(String topic);
146
147     /**
148      * Destroys all DMAAP Topic Sources
149      */
150     public void destroy();
151
152     /**
153      * gets an DMAAP Topic Source based on topic name
154      * 
155      * @param topic the topic name
156      * @return an DMAAP Topic Source with topic name
157      * @throws IllegalArgumentException if an invalid topic is provided
158      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
159      */
160     public DmaapTopicSource get(String topic);
161
162     /**
163      * Provides a snapshot of the DMAAP Topic Sources
164      * 
165      * @return a list of the DMAAP Topic Sources
166      */
167     public List<DmaapTopicSource> inventory();
168 }
169
170
171 /* ------------- implementation ----------------- */
172
173 /**
174  * Factory of DMAAP Source Topics indexed by topic name
175  */
176
177 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
178     private static final String MISSING_TOPIC = "A topic must be provided";
179
180     /**
181      * Logger
182      */
183     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
184
185     /**
186      * DMaaP Topic Name Index
187      */
188     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
189
190     /**
191      * {@inheritDoc}
192      */
193     @Override
194     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
195             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
196             String environment, String aftEnvironment, String partner, String latitude, String longitude,
197             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
198
199         if (topic == null || topic.isEmpty()) {
200             throw new IllegalArgumentException(MISSING_TOPIC);
201         }
202
203         synchronized (this) {
204             if (dmaapTopicSources.containsKey(topic)) {
205                 return dmaapTopicSources.get(topic);
206             }
207
208             DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
209                     .servers(servers)
210                     .topic(topic)
211                     .apiKey(apiKey)
212                     .apiSecret(apiSecret)
213                     .userName(userName)
214                     .password(password)
215                     .consumerGroup(consumerGroup)
216                     .consumerInstance(consumerInstance)
217                     .fetchTimeout(fetchTimeout)
218                     .fetchLimit(fetchLimit)
219                     .environment(environment)
220                     .aftEnvironment(aftEnvironment)
221                     .partner(partner)
222                     .latitude(latitude)
223                     .longitude(longitude)
224                     .additionalProps(additionalProps)
225                     .useHttps(useHttps)
226                     .allowSelfSignedCerts(allowSelfSignedCerts)
227                     .build());
228
229             if (managed) {
230                 dmaapTopicSources.put(topic, dmaapTopicSource);
231             }
232
233             return dmaapTopicSource;
234         }
235     }
236
237     /**
238      * {@inheritDoc}
239      */
240     @Override
241     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
242             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
243             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
244
245         if (servers == null || servers.isEmpty()) {
246             throw new IllegalArgumentException("DMaaP Server(s) must be provided");
247         }
248
249         if (topic == null || topic.isEmpty()) {
250             throw new IllegalArgumentException(MISSING_TOPIC);
251         }
252
253         synchronized (this) {
254             if (dmaapTopicSources.containsKey(topic)) {
255                 return dmaapTopicSources.get(topic);
256             }
257
258             DmaapTopicSource dmaapTopicSource =
259                     new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
260                             .servers(servers)
261                             .topic(topic)
262                             .apiKey(apiKey)
263                             .apiSecret(apiSecret)
264                             .userName(userName)
265                             .password(password)
266                             .consumerGroup(consumerGroup)
267                             .consumerInstance(consumerInstance)
268                             .fetchTimeout(fetchTimeout)
269                             .fetchLimit(fetchLimit)
270                             .useHttps(useHttps)
271                             .allowSelfSignedCerts(allowSelfSignedCerts)
272                             .build());
273
274             if (managed) {
275                 dmaapTopicSources.put(topic, dmaapTopicSource);
276             }
277
278             return dmaapTopicSource;
279         }
280     }
281
282     /**
283      * {@inheritDoc}
284      */
285     @Override
286     public List<DmaapTopicSource> build(Properties properties) {
287
288         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
289         if (readTopics == null || readTopics.isEmpty()) {
290             logger.info("{}: no topic for DMaaP Source", this);
291             return new ArrayList<>();
292         }
293         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
294
295         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
296         synchronized (this) {
297             for (String topic : readTopicList) {
298                 if (this.dmaapTopicSources.containsKey(topic)) {
299                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
300                     continue;
301                 }
302
303                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
304                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
305
306                 List<String> serverList;
307                 if (servers != null && !servers.isEmpty()) {
308                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
309                 } else {
310                     serverList = new ArrayList<>();
311                 }
312
313                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
314                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
315
316                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
317                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
318
319                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
320                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
321
322                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
323                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
324
325                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
326                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
327
328                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
329                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
330
331                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
332                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
333
334                 /* DME2 Properties */
335
336                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
337                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
338
339                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
340                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
341
342                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
343                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
344
345                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
346                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
347
348                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
349                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
350
351                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
352                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
353
354                 String dme2EpReadTimeoutMs =
355                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
356                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
357
358                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
359                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
360
361                 String dme2RoundtripTimeoutMs =
362                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
363                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
364
365                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
366                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
367
368                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
369                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
370
371                 String dme2SessionStickinessRequired =
372                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
373                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
374
375                 Map<String, String> dme2AdditionalProps = new HashMap<>();
376
377                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
378                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
379                 }
380                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
381                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
382                 }
383                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
384                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
385                 }
386                 if (dme2Version != null && !dme2Version.isEmpty()) {
387                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
388                 }
389                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
390                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
391                 }
392                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
393                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
394                 }
395                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
396                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
397                 }
398
399
400                 if (servers == null || servers.isEmpty()) {
401
402                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
403                     continue;
404                 }
405
406                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
407                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
408                     try {
409                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
410                     } catch (NumberFormatException nfe) {
411                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
412                                 topic);
413                     }
414                 }
415
416                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
417                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
418                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
419                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
420                     try {
421                         fetchLimit = Integer.parseInt(fetchLimitString);
422                     } catch (NumberFormatException nfe) {
423                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
424                                 topic);
425                     }
426                 }
427
428                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
429                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
430                 boolean managed = true;
431                 if (managedString != null && !managedString.isEmpty()) {
432                     managed = Boolean.parseBoolean(managedString);
433                 }
434
435                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
436                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
437
438                 // default is to use HTTP if no https property exists
439                 boolean useHttps = false;
440                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
441                     useHttps = Boolean.parseBoolean(useHttpsString);
442                 }
443
444                 String allowSelfSignedCertsString =
445                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
446                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
447
448                 // default is to disallow self-signed certs
449                 boolean allowSelfSignedCerts = false;
450                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
451                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
452                 }
453
454
455                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
456                         aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
457                         dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
458                         useHttps, allowSelfSignedCerts);
459
460                 dmaapTopicSourceLst.add(uebTopicSource);
461             }
462         }
463         return dmaapTopicSourceLst;
464     }
465
466     /**
467      * {@inheritDoc}
468      * 
469      * @throws IllegalArgumentException
470      */
471     @Override
472     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
473         return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
474                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
475     }
476
477     /**
478      * {@inheritDoc}
479      * 
480      * @throws IllegalArgumentException
481      */
482     @Override
483     public DmaapTopicSource build(List<String> servers, String topic) {
484         return this.build(servers, topic, null, null);
485     }
486
487     /**
488      * {@inheritDoc}
489      */
490     @Override
491     public void destroy(String topic) {
492
493         if (topic == null || topic.isEmpty()) {
494             throw new IllegalArgumentException(MISSING_TOPIC);
495         }
496
497         DmaapTopicSource uebTopicSource;
498
499         synchronized (this) {
500             if (!dmaapTopicSources.containsKey(topic)) {
501                 return;
502             }
503
504             uebTopicSource = dmaapTopicSources.remove(topic);
505         }
506
507         uebTopicSource.shutdown();
508     }
509
510     /**
511      * {@inheritDoc}
512      */
513     @Override
514     public DmaapTopicSource get(String topic) {
515
516         if (topic == null || topic.isEmpty()) {
517             throw new IllegalArgumentException(MISSING_TOPIC);
518         }
519
520         synchronized (this) {
521             if (dmaapTopicSources.containsKey(topic)) {
522                 return dmaapTopicSources.get(topic);
523             } else {
524                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
525             }
526         }
527     }
528
529     @Override
530     public synchronized List<DmaapTopicSource> inventory() {
531         return new ArrayList<>(this.dmaapTopicSources.values());
532     }
533
534     @Override
535     public void destroy() {
536         List<DmaapTopicSource> readers = this.inventory();
537         for (DmaapTopicSource reader : readers) {
538             reader.shutdown();
539         }
540
541         synchronized (this) {
542             this.dmaapTopicSources.clear();
543         }
544     }
545
546     @Override
547     public String toString() {
548         StringBuilder builder = new StringBuilder();
549         builder.append("IndexedDmaapTopicSourceFactory []");
550         return builder.toString();
551     }
552
553 }
554