cc31c2a57465fd9c71d0e757da644723b8448060
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus;
23
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Properties;
30
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams;
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
33 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * DMAAP Topic Source Factory.
39  */
40 public interface DmaapTopicSourceFactory {
41     public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
42     public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
43     public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
44     public final String DME2_VERSION_PROPERTY = "Version";
45     public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
46     public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
47     public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
48     public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
49
50     /**
51      * Creates an DMAAP Topic Source based on properties files.
52      * 
53      * @param properties Properties containing initialization values
54      * 
55      * @return an DMAAP Topic Source
56      * @throws IllegalArgumentException if invalid parameters are present
57      */
58     public List<DmaapTopicSource> build(Properties properties);
59
60     /**
61      * Instantiates a new DMAAP Topic Source.
62      * 
63      * @param servers list of servers
64      * @param topic topic name
65      * @param apiKey API Key
66      * @param apiSecret API Secret
67      * @param userName user name
68      * @param password password
69      * @param consumerGroup Consumer Group
70      * @param consumerInstance Consumer Instance
71      * @param fetchTimeout Read Fetch Timeout
72      * @param fetchLimit Fetch Limit
73      * @param managed is this endpoind managed?
74      * @param useHttps does the connection use HTTPS?
75      * @param allowSelfSignedCerts does connection allow self-signed certificates?
76      * 
77      * @return an DMAAP Topic Source
78      * @throws IllegalArgumentException if invalid parameters are present
79      */
80     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
81                                   String password, String consumerGroup, String consumerInstance,
82                                   int fetchTimeout, int fetchLimit,
83                                   boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
84
85     /**
86      * Instantiates a new DMAAP Topic Source.
87      *
88      * @param servers list of servers
89      * @param topic topic name
90      * @param apiKey API Key
91      * @param apiSecret API Secret
92      * @param userName user name
93      * @param password password
94      * @param consumerGroup Consumer Group
95      * @param consumerInstance Consumer Instance
96      * @param fetchTimeout Read Fetch Timeout
97      * @param fetchLimit Fetch Limit
98      * @param environment DME2 environment
99      * @param aftEnvironment DME2 AFT environment
100      * @param partner DME2 Partner
101      * @param latitude DME2 latitude
102      * @param longitude DME2 longitude
103      * @param additionalProps additional properties to pass to DME2
104      * @param managed is this endpoind managed?
105      * @param useHttps does the connection use HTTPS?
106      * @param allowSelfSignedCerts does connection allow self-signed certificates?
107      *
108      * @return an DMAAP Topic Source
109      * @throws IllegalArgumentException if invalid parameters are present
110      */
111     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
112                                   String password, String consumerGroup, String consumerInstance, int fetchTimeout,
113                                   int fetchLimit, String environment, String aftEnvironment, String partner, 
114                                   String latitude, String longitude, Map<String, String> additionalProps, 
115                                   boolean managed, boolean useHttps, boolean allowSelfSignedCerts);
116
117     /**
118      * Instantiates a new DMAAP Topic Source.
119      * 
120      * @param servers list of servers
121      * @param topic topic name
122      * @param apiKey API Key
123      * @param apiSecret API Secret
124      * 
125      * @return an DMAAP Topic Source
126      * @throws IllegalArgumentException if invalid parameters are present
127      */
128     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret);
129
130     /**
131      * Instantiates a new DMAAP Topic Source.
132      * 
133      * @param servers list of servers
134      * @param topic topic name
135      * 
136      * @return an DMAAP Topic Source
137      * @throws IllegalArgumentException if invalid parameters are present
138      */
139     public DmaapTopicSource build(List<String> servers, String topic);
140
141     /**
142      * Destroys an DMAAP Topic Source based on a topic.
143      * 
144      * @param topic topic name
145      * @throws IllegalArgumentException if invalid parameters are present
146      */
147     public void destroy(String topic);
148
149     /**
150      * Destroys all DMAAP Topic Sources.
151      */
152     public void destroy();
153
154     /**
155      * Gets an DMAAP Topic Source based on topic name.
156      * 
157      * @param topic the topic name
158      * @return an DMAAP Topic Source with topic name
159      * @throws IllegalArgumentException if an invalid topic is provided
160      * @throws IllegalStateException if the DMAAP Topic Source is an incorrect state
161      */
162     public DmaapTopicSource get(String topic);
163
164     /**
165      * Provides a snapshot of the DMAAP Topic Sources.
166      * 
167      * @return a list of the DMAAP Topic Sources
168      */
169     public List<DmaapTopicSource> inventory();
170 }
171
172
173 /* ------------- implementation ----------------- */
174
175 /**
176  * Factory of DMAAP Source Topics indexed by topic name.
177  */
178
179 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
180     private static final String MISSING_TOPIC = "A topic must be provided";
181
182     /**
183      * Logger.
184      */
185     private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);
186
187     /**
188      * DMaaP Topic Name Index.
189      */
190     protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<>();
191
192     /**
193      * {@inheritDoc}
194      */
195     @Override
196     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
197             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
198             String environment, String aftEnvironment, String partner, String latitude, String longitude,
199             Map<String, String> additionalProps, boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
200
201         if (topic == null || topic.isEmpty()) {
202             throw new IllegalArgumentException(MISSING_TOPIC);
203         }
204
205         synchronized (this) {
206             if (dmaapTopicSources.containsKey(topic)) {
207                 return dmaapTopicSources.get(topic);
208             }
209
210             DmaapTopicSource dmaapTopicSource = new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
211                     .servers(servers)
212                     .topic(topic)
213                     .apiKey(apiKey)
214                     .apiSecret(apiSecret)
215                     .userName(userName)
216                     .password(password)
217                     .consumerGroup(consumerGroup)
218                     .consumerInstance(consumerInstance)
219                     .fetchTimeout(fetchTimeout)
220                     .fetchLimit(fetchLimit)
221                     .environment(environment)
222                     .aftEnvironment(aftEnvironment)
223                     .partner(partner)
224                     .latitude(latitude)
225                     .longitude(longitude)
226                     .additionalProps(additionalProps)
227                     .useHttps(useHttps)
228                     .allowSelfSignedCerts(allowSelfSignedCerts)
229                     .build());
230
231             if (managed) {
232                 dmaapTopicSources.put(topic, dmaapTopicSource);
233             }
234
235             return dmaapTopicSource;
236         }
237     }
238
239     /**
240      * {@inheritDoc}
241      */
242     @Override
243     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
244             String password, String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit,
245             boolean managed, boolean useHttps, boolean allowSelfSignedCerts) {
246
247         if (servers == null || servers.isEmpty()) {
248             throw new IllegalArgumentException("DMaaP Server(s) must be provided");
249         }
250
251         if (topic == null || topic.isEmpty()) {
252             throw new IllegalArgumentException(MISSING_TOPIC);
253         }
254
255         synchronized (this) {
256             if (dmaapTopicSources.containsKey(topic)) {
257                 return dmaapTopicSources.get(topic);
258             }
259
260             DmaapTopicSource dmaapTopicSource =
261                     new SingleThreadedDmaapTopicSource(BusTopicParams.builder()
262                             .servers(servers)
263                             .topic(topic)
264                             .apiKey(apiKey)
265                             .apiSecret(apiSecret)
266                             .userName(userName)
267                             .password(password)
268                             .consumerGroup(consumerGroup)
269                             .consumerInstance(consumerInstance)
270                             .fetchTimeout(fetchTimeout)
271                             .fetchLimit(fetchLimit)
272                             .useHttps(useHttps)
273                             .allowSelfSignedCerts(allowSelfSignedCerts)
274                             .build());
275
276             if (managed) {
277                 dmaapTopicSources.put(topic, dmaapTopicSource);
278             }
279
280             return dmaapTopicSource;
281         }
282     }
283
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     public List<DmaapTopicSource> build(Properties properties) {
289
290         String readTopics = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
291         if (readTopics == null || readTopics.isEmpty()) {
292             logger.info("{}: no topic for DMaaP Source", this);
293             return new ArrayList<>();
294         }
295         List<String> readTopicList = new ArrayList<>(Arrays.asList(readTopics.split("\\s*,\\s*")));
296
297         List<DmaapTopicSource> dmaapTopicSourceLst = new ArrayList<>();
298         synchronized (this) {
299             for (String topic : readTopicList) {
300                 if (this.dmaapTopicSources.containsKey(topic)) {
301                     dmaapTopicSourceLst.add(this.dmaapTopicSources.get(topic));
302                     continue;
303                 }
304
305                 String servers = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
306                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
307
308                 List<String> serverList;
309                 if (servers != null && !servers.isEmpty()) {
310                     serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
311                 } else {
312                     serverList = new ArrayList<>();
313                 }
314
315                 String apiKey = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
316                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
317
318                 String apiSecret = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
319                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
320
321                 String aafMechId = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
322                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
323
324                 String aafPassword = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
325                         + topic + PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
326
327                 String consumerGroup = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
328                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
329
330                 String consumerInstance = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
331                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
332
333                 String fetchTimeoutString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
334                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
335
336                 /* DME2 Properties */
337
338                 String dme2Environment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
339                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
340
341                 String dme2AftEnvironment = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
342                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
343
344                 String dme2Partner = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
345                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
346
347                 String dme2RouteOffer = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
348                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
349
350                 String dme2Latitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
351                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
352
353                 String dme2Longitude = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
354                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
355
356                 String dme2EpReadTimeoutMs =
357                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
358                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
359
360                 String dme2EpConnTimeout = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
361                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
362
363                 String dme2RoundtripTimeoutMs =
364                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
365                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
366
367                 String dme2Version = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
368                         + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
369
370                 String dme2SubContextPath = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
371                         + "." + topic + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
372
373                 String dme2SessionStickinessRequired =
374                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
375                                 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
376
377                 Map<String, String> dme2AdditionalProps = new HashMap<>();
378
379                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) {
380                     dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
381                 }
382                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) {
383                     dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
384                 }
385                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) {
386                     dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
387                 }
388                 if (dme2Version != null && !dme2Version.isEmpty()) {
389                     dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
390                 }
391                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) {
392                     dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
393                 }
394                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) {
395                     dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
396                 }
397                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) {
398                     dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
399                 }
400
401
402                 if (servers == null || servers.isEmpty()) {
403
404                     logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
405                     continue;
406                 }
407
408                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
409                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
410                     try {
411                         fetchTimeout = Integer.parseInt(fetchTimeoutString);
412                     } catch (NumberFormatException nfe) {
413                         logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", this, fetchTimeoutString,
414                                 topic);
415                     }
416                 }
417
418                 String fetchLimitString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
419                         + "." + topic + PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
420                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
421                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
422                     try {
423                         fetchLimit = Integer.parseInt(fetchLimitString);
424                     } catch (NumberFormatException nfe) {
425                         logger.warn("{}: fetch limit {} is in invalid format for topic {} ", this, fetchLimitString,
426                                 topic);
427                     }
428                 }
429
430                 String managedString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
431                         + "." + topic + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX);
432                 boolean managed = true;
433                 if (managedString != null && !managedString.isEmpty()) {
434                     managed = Boolean.parseBoolean(managedString);
435                 }
436
437                 String useHttpsString = properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS
438                         + "." + topic + PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
439
440                 // default is to use HTTP if no https property exists
441                 boolean useHttps = false;
442                 if (useHttpsString != null && !useHttpsString.isEmpty()) {
443                     useHttps = Boolean.parseBoolean(useHttpsString);
444                 }
445
446                 String allowSelfSignedCertsString =
447                         properties.getProperty(PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
448                                 + PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
449
450                 // default is to disallow self-signed certs
451                 boolean allowSelfSignedCerts = false;
452                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()) {
453                     allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
454                 }
455
456
457                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId,
458                         aafPassword, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, dme2Environment,
459                         dme2AftEnvironment, dme2Partner, dme2Latitude, dme2Longitude, dme2AdditionalProps, managed,
460                         useHttps, allowSelfSignedCerts);
461
462                 dmaapTopicSourceLst.add(uebTopicSource);
463             }
464         }
465         return dmaapTopicSourceLst;
466     }
467
468     /**
469      * {@inheritDoc}
470      * 
471      * @throws IllegalArgumentException
472      */
473     @Override
474     public DmaapTopicSource build(List<String> servers, String topic, String apiKey, String apiSecret) {
475         return this.build(servers, topic, apiKey, apiSecret, null, null, null, null,
476                 DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, true, false, false);
477     }
478
479     /**
480      * {@inheritDoc}
481      * 
482      * @throws IllegalArgumentException
483      */
484     @Override
485     public DmaapTopicSource build(List<String> servers, String topic) {
486         return this.build(servers, topic, null, null);
487     }
488
489     /**
490      * {@inheritDoc}
491      */
492     @Override
493     public void destroy(String topic) {
494
495         if (topic == null || topic.isEmpty()) {
496             throw new IllegalArgumentException(MISSING_TOPIC);
497         }
498
499         DmaapTopicSource uebTopicSource;
500
501         synchronized (this) {
502             if (!dmaapTopicSources.containsKey(topic)) {
503                 return;
504             }
505
506             uebTopicSource = dmaapTopicSources.remove(topic);
507         }
508
509         uebTopicSource.shutdown();
510     }
511
512     @Override
513     public void destroy() {
514         List<DmaapTopicSource> readers = this.inventory();
515         for (DmaapTopicSource reader : readers) {
516             reader.shutdown();
517         }
518
519         synchronized (this) {
520             this.dmaapTopicSources.clear();
521         }
522     }
523
524     /**
525      * {@inheritDoc}
526      */
527     @Override
528     public DmaapTopicSource get(String topic) {
529
530         if (topic == null || topic.isEmpty()) {
531             throw new IllegalArgumentException(MISSING_TOPIC);
532         }
533
534         synchronized (this) {
535             if (dmaapTopicSources.containsKey(topic)) {
536                 return dmaapTopicSources.get(topic);
537             } else {
538                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
539             }
540         }
541     }
542
543     @Override
544     public synchronized List<DmaapTopicSource> inventory() {
545         return new ArrayList<>(this.dmaapTopicSources.values());
546     }
547
548     @Override
549     public String toString() {
550         StringBuilder builder = new StringBuilder();
551         builder.append("IndexedDmaapTopicSourceFactory []");
552         return builder.toString();
553     }
554
555 }
556