9f60556cab5b1cd42b5bf575eb0d8048ab66bd26
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
32 import org.openecomp.policy.common.logging.flexlogger.Logger;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Source Factory
37  */
38 public interface DmaapTopicSourceFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47         
48         /**
49          * Creates an DMAAP Topic Source based on properties files
50          * 
51          * @param properties Properties containing initialization values
52          * 
53          * @return an DMAAP Topic Source
54          * @throws IllegalArgumentException if invalid parameters are present
55          */
56         public List<DmaapTopicSource> build(Properties properties)
57                         throws IllegalArgumentException;
58         
59         /**
60          * Instantiates a new DMAAP Topic Source
61          * 
62          * @param servers list of servers
63          * @param topic topic name
64          * @param apiKey API Key
65          * @param apiSecret API Secret
66          * @param userName user name
67          * @param password password
68          * @param consumerGroup Consumer Group
69          * @param consumerInstance Consumer Instance
70          * @param fetchTimeout Read Fetch Timeout
71          * @param fetchLimit Fetch Limit
72          * @param managed is this endpoind managed?
73          * @param useHttps does the connection use HTTPS?
74          * @param allowSelfSignedCerts does connection allow self-signed certificates?
75          * 
76          * @return an DMAAP Topic Source
77          * @throws IllegalArgumentException if invalid parameters are present
78          */
79         public DmaapTopicSource build(List<String> servers, 
80                                                                 String topic, 
81                                                                 String apiKey, 
82                                                                 String apiSecret, 
83                                                                 String userName, 
84                                                                 String password,
85                                                                 String consumerGroup, 
86                                                                 String consumerInstance,
87                                                                 int fetchTimeout,
88                                                                 int fetchLimit,
89                                                                 boolean managed,
90                                                                 boolean useHttps,
91                                                                 boolean allowSelfSignedCerts)
92                         throws IllegalArgumentException;
93         
94         /**
95          * Instantiates a new DMAAP Topic Source
96          * 
97          * @param servers list of servers
98          * @param topic topic name
99          * @param apiKey API Key
100          * @param apiSecret API Secret
101          * @param userName user name
102          * @param password password
103          * @param consumerGroup Consumer Group
104          * @param consumerInstance Consumer Instance
105          * @param fetchTimeout Read Fetch Timeout
106          * @param fetchLimit Fetch Limit
107          * @param environment DME2 environment
108          * @param aftEnvironment DME2 AFT environment
109          * @param partner DME2 Partner
110          * @param latitude DME2 latitude
111          * @param longitude DME2 longitude
112          * @param additionalProps additional properties to pass to DME2
113          * @param managed is this endpoind managed?
114          * @param useHttps does the connection use HTTPS?
115          * @param allowSelfSignedCerts does connection allow self-signed certificates?
116          * 
117          * @return an DMAAP Topic Source
118          * @throws IllegalArgumentException if invalid parameters are present
119          */
120         public DmaapTopicSource build(List<String> servers, 
121                                                                 String topic, 
122                                                                 String apiKey, 
123                                                                 String apiSecret, 
124                                                                 String userName, 
125                                                                 String password,
126                                                                 String consumerGroup, 
127                                                                 String consumerInstance,
128                                                                 int fetchTimeout,
129                                                                 int fetchLimit,
130                                                                 String environment,
131                                                                 String aftEnvironment,
132                                                                 String partner,
133                                                                 String latitude,
134                                                                 String longitude,
135                                                                 Map<String,String> additionalProps,
136                                                                 boolean managed,
137                                                                 boolean useHttps, 
138                                                                 boolean allowSelfSignedCerts)
139                         throws IllegalArgumentException;
140         
141         /**
142          * Instantiates a new DMAAP Topic Source
143          * 
144          * @param servers list of servers
145          * @param topic topic name
146          * @param apiKey API Key
147          * @param apiSecret API Secret
148          * 
149          * @return an DMAAP Topic Source
150          * @throws IllegalArgumentException if invalid parameters are present
151          */
152         public DmaapTopicSource build(List<String> servers, 
153                                                                 String topic, 
154                                                                 String apiKey, 
155                                                                 String apiSecret)
156                         throws IllegalArgumentException;
157
158         /**
159          * Instantiates a new DMAAP Topic Source
160          * 
161          * @param uebTopicReaderType Implementation type
162          * @param servers list of servers
163          * @param topic topic name
164          * 
165          * @return an DMAAP Topic Source
166          * @throws IllegalArgumentException if invalid parameters are present
167          */
168         public DmaapTopicSource build(List<String> servers,
169                                                                 String topic)
170                         throws IllegalArgumentException;        
171         
172         /**
173          * Destroys an DMAAP Topic Source based on a topic
174          * 
175          * @param topic topic name
176          * @throws IllegalArgumentException if invalid parameters are present
177          */
178         public void destroy(String topic);
179         
180         /**
181          * Destroys all DMAAP Topic Sources
182          */
183         public void destroy();
184         
185         /**
186          * gets an DMAAP Topic Source based on topic name
187          * @param topic the topic name
188          * @return an DMAAP Topic Source with topic name
189          * @throws IllegalArgumentException if an invalid topic is provided
190          * @throws IllegalStateException if the DMAAP Topic Source is 
191          * an incorrect state
192          */
193         public DmaapTopicSource get(String topic)
194                    throws IllegalArgumentException, IllegalStateException;
195         
196         /**
197          * Provides a snapshot of the DMAAP Topic Sources
198          * @return a list of the DMAAP Topic Sources
199          */
200         public List<DmaapTopicSource> inventory();
201 }
202
203
204 /* ------------- implementation ----------------- */
205
206 /**
207  * Factory of DMAAP Source Topics indexed by topic name
208  */
209
210 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
211         // get an instance of logger 
212         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class);             
213         /**
214          * UEB Topic Name Index
215          */
216         protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
217                         new HashMap<String, DmaapTopicSource>();
218         
219         /**
220          * {@inheritDoc}
221          */
222         @Override
223         public DmaapTopicSource build(List<String> servers, 
224                                                                 String topic, 
225                                                                 String apiKey, 
226                                                                 String apiSecret, 
227                                                                 String userName, 
228                                                                 String password,
229                                                                 String consumerGroup, 
230                                                                 String consumerInstance,
231                                                                 int fetchTimeout,
232                                                                 int fetchLimit,
233                                                                 String environment,
234                                                                 String aftEnvironment,
235                                                                 String partner,
236                                                                 String latitude,
237                                                                 String longitude,
238                                                                 Map<String,String> additionalProps,
239                                                                 boolean managed,
240                                                                 boolean useHttps,
241                                                                 boolean allowSelfSignedCerts) 
242                         throws IllegalArgumentException {
243                 
244                 if (topic == null || topic.isEmpty()) {
245                         throw new IllegalArgumentException("A topic must be provided");
246                 }
247                 
248                 synchronized(this) {
249                         if (dmaapTopicSources.containsKey(topic)) {
250                                 return dmaapTopicSources.get(topic);
251                         }
252                         
253                         DmaapTopicSource dmaapTopicSource = 
254                                         new SingleThreadedDmaapTopicSource(servers, topic, 
255                                                                                                          apiKey, apiSecret, userName, password,
256                                                                                                          consumerGroup, consumerInstance, 
257                                                                                                          fetchTimeout, fetchLimit,
258                                                                                                          environment, aftEnvironment, partner,
259                                                                                                          latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
260                         
261                         if (managed)
262                                 dmaapTopicSources.put(topic, dmaapTopicSource);
263                         
264                         return dmaapTopicSource;
265                 }
266         }
267         /**
268          * {@inheritDoc}
269          */
270         @Override
271         public DmaapTopicSource build(List<String> servers, 
272                                                                 String topic, 
273                                                                 String apiKey, 
274                                                                 String apiSecret, 
275                                                                 String userName, 
276                                                                 String password,
277                                                                 String consumerGroup, 
278                                                                 String consumerInstance,
279                                                                 int fetchTimeout,
280                                                                 int fetchLimit,
281                                                                 boolean managed,
282                                                                 boolean useHttps,
283                                                                 boolean allowSelfSignedCerts) 
284                         throws IllegalArgumentException {
285                 
286                 if (servers == null || servers.isEmpty()) {
287                         throw new IllegalArgumentException("DMaaP Server(s) must be provided");
288                 }
289                 
290                 if (topic == null || topic.isEmpty()) {
291                         throw new IllegalArgumentException("A topic must be provided");
292                 }
293                 
294                 synchronized(this) {
295                         if (dmaapTopicSources.containsKey(topic)) {
296                                 return dmaapTopicSources.get(topic);
297                         }
298                         
299                         DmaapTopicSource dmaapTopicSource = 
300                                         new SingleThreadedDmaapTopicSource(servers, topic, 
301                                                                                                          apiKey, apiSecret, userName, password,
302                                                                                                          consumerGroup, consumerInstance, 
303                                                                                                          fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
304                         
305                         if (managed)
306                                 dmaapTopicSources.put(topic, dmaapTopicSource);
307                         
308                         return dmaapTopicSource;
309                 }
310         }
311         
312         /**
313          * {@inheritDoc}
314          */
315         @Override
316         public List<DmaapTopicSource> build(Properties properties) 
317                         throws IllegalArgumentException {
318                 
319                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
320                 if (readTopics == null || readTopics.isEmpty()) {
321                         logger.warn("No topic for UEB Source " + properties);
322                         return new ArrayList<DmaapTopicSource>();
323                 }
324                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
325                 
326                 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
327                 synchronized(this) {
328                         for (String topic: readTopicList) {
329                                 
330                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
331                                                         topic + 
332                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
333                                 
334                                 List<String> serverList;
335                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
336                                 else serverList = new ArrayList<>();
337                                 
338                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
339                                                                                    "." + topic + 
340                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
341                                 
342                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
343                                                                                   "." + topic + 
344                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
345                                 
346                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
347                                                                                           "." + topic + 
348                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
349
350                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
351                                                                                           "." + topic + 
352                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
353                                 
354                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
355                                                                                       "." + topic + 
356                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
357                                 
358                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
359                                                                                          "." + topic + 
360                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
361                                 
362                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
363                                                                                            "." + topic + 
364                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
365                                 
366                                 /* DME2 Properties */
367                                 
368                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
369                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
370
371                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
372                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
373
374                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
375                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
376                                 
377                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
378                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
379
380                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
381                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
382
383                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
384                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
385
386                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
387                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
388
389                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
390                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
391
392                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
393                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
394
395                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
396                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
397
398                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
399                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
400
401                                 String dme2SessionStickinessRequired = properties
402                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
403                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
404                                 
405                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
406                                 
407                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
408                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
409                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
410                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
411                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
412                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
413                                 if (dme2Version != null && !dme2Version.isEmpty())
414                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
415                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
416                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
417                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
418                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
419                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
420                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
421                                 
422                                 
423                                 if (servers == null || servers.isEmpty()) {
424
425                                         logger.error("No DMaaP servers or DME2 ServiceName provided");
426                                         continue;
427                                 }
428                                 
429                                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
430                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
431                                         try {
432                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
433                                         } catch (NumberFormatException nfe) {
434                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
435                                         }
436                                 }
437                                         
438                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
439                                                                  "." + topic + 
440                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
441                                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
442                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
443                                         try {
444                                                 fetchLimit = Integer.parseInt(fetchLimitString);
445                                         } catch (NumberFormatException nfe) {
446                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
447                                         }
448                                 }
449                                 
450                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
451                                                                "." + topic + 
452                                                               PolicyProperties.PROPERTY_MANAGED_SUFFIX);
453                                 boolean managed = true;
454                                 if (managedString != null && !managedString.isEmpty()) {
455                                         managed = Boolean.parseBoolean(managedString);
456                                 }
457                                 
458                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
459                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
460
461                                         //default is to use HTTP if no https property exists
462                                 boolean useHttps = false;
463                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
464                                         useHttps = Boolean.parseBoolean(useHttpsString);
465                                 }
466                                 
467                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
468                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
469
470                                         //default is to disallow self-signed certs 
471                                 boolean allowSelfSignedCerts = false;
472                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
473                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
474                                 }                               
475                                 
476                                 
477                                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
478                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
479                                                                                                            consumerGroup, consumerInstance, 
480                                                                                                            fetchTimeout, fetchLimit, 
481                                                                                                            dme2Environment, dme2AftEnvironment, dme2Partner,
482                                                                                                            dme2Latitude, dme2Longitude, dme2AdditionalProps,
483                                                                                                            managed, useHttps, allowSelfSignedCerts);
484                                 
485                                 dmaapTopicSource_s.add(uebTopicSource);
486                         }
487                 }
488                 return dmaapTopicSource_s;
489         }
490         
491         /**
492          * {@inheritDoc}
493          * @throws IllegalArgumentException 
494          */
495         @Override
496         public DmaapTopicSource build(List<String> servers, 
497                                                                 String topic,
498                                                                 String apiKey, 
499                                                                 String apiSecret) throws IllegalArgumentException {
500                 return this.build(servers, topic, 
501                                                   apiKey, apiSecret, null, null,
502                                                   null, null,
503                                                   DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
504                                                   DmaapTopicSource.DEFAULT_LIMIT_FETCH,
505                                                   true,
506                                                   false,
507                                                   false);
508         }
509
510         /**
511          * {@inheritDoc}
512          * @throws IllegalArgumentException 
513          */
514         @Override
515         public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
516                 return this.build(servers, topic, null, null);
517         }       
518
519         /**
520          * {@inheritDoc}
521          */
522         @Override
523         public void destroy(String topic) 
524                    throws IllegalArgumentException {
525                 
526                 if (topic == null || topic.isEmpty()) {
527                         throw new IllegalArgumentException("A topic must be provided");
528                 }
529                 
530                 DmaapTopicSource uebTopicSource;
531                 
532                 synchronized(this) {
533                         if (!dmaapTopicSources.containsKey(topic)) {
534                                 return;
535                         }
536                         
537                         uebTopicSource = dmaapTopicSources.remove(topic);
538                 }
539                 
540                 uebTopicSource.shutdown();
541         }
542
543         /**
544          * {@inheritDoc}
545          */
546         @Override
547         public DmaapTopicSource get(String topic) 
548                throws IllegalArgumentException, IllegalStateException {
549                 
550                 if (topic == null || topic.isEmpty()) {
551                         throw new IllegalArgumentException("A topic must be provided");
552                 }
553                 
554                 synchronized(this) {
555                         if (dmaapTopicSources.containsKey(topic)) {
556                                 return dmaapTopicSources.get(topic);
557                         } else {
558                                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
559                         }
560                 }
561         }
562
563         @Override
564         public synchronized List<DmaapTopicSource> inventory() {
565                  List<DmaapTopicSource> readers = 
566                                  new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
567                  return readers;
568         }
569
570         @Override
571         public void destroy() {
572                 List<DmaapTopicSource> readers = this.inventory();
573                 for (DmaapTopicSource reader: readers) {
574                         reader.shutdown();
575                 }
576                 
577                 synchronized(this) {
578                         this.dmaapTopicSources.clear();
579                 }
580         }
581         
582 }
583