a54cb6f45890b67c9756f6720819e14299423d4e
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
32 import org.openecomp.policy.common.logging.flexlogger.Logger;
33 import org.openecomp.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Source Factory
37  */
38 public interface DmaapTopicSourceFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47         
48         /**
49          * Creates an DMAAP Topic Source based on properties files
50          * 
51          * @param properties Properties containing initialization values
52          * 
53          * @return an DMAAP Topic Source
54          * @throws IllegalArgumentException if invalid parameters are present
55          */
56         public List<DmaapTopicSource> build(Properties properties)
57                         throws IllegalArgumentException;
58         
59         /**
60          * Instantiates a new DMAAP Topic Source
61          * 
62          * @param servers list of servers
63          * @param topic topic name
64          * @param apiKey API Key
65          * @param apiSecret API Secret
66          * @param userName user name
67          * @param password password
68          * @param consumerGroup Consumer Group
69          * @param consumerInstance Consumer Instance
70          * @param fetchTimeout Read Fetch Timeout
71          * @param fetchLimit Fetch Limit
72          * @param managed is this endpoind managed?
73          * @param useHttps does the connection use HTTPS?
74          * @param allowSelfSignedCerts does connection allow self-signed certificates?
75          * 
76          * @return an DMAAP Topic Source
77          * @throws IllegalArgumentException if invalid parameters are present
78          */
79         public DmaapTopicSource build(List<String> servers, 
80                                                                 String topic, 
81                                                                 String apiKey, 
82                                                                 String apiSecret, 
83                                                                 String userName, 
84                                                                 String password,
85                                                                 String consumerGroup, 
86                                                                 String consumerInstance,
87                                                                 int fetchTimeout,
88                                                                 int fetchLimit,
89                                                                 boolean managed,
90                                                                 boolean useHttps,
91                                                                 boolean allowSelfSignedCerts)
92                         throws IllegalArgumentException;
93         
94         /**
95          * Instantiates a new DMAAP Topic Source
96          * 
97          * @param servers list of servers
98          * @param topic topic name
99          * @param apiKey API Key
100          * @param apiSecret API Secret
101          * @param userName user name
102          * @param password password
103          * @param consumerGroup Consumer Group
104          * @param consumerInstance Consumer Instance
105          * @param fetchTimeout Read Fetch Timeout
106          * @param fetchLimit Fetch Limit
107          * @param environment DME2 environment
108          * @param aftEnvironment DME2 AFT environment
109          * @param partner DME2 Partner
110          * @param latitude DME2 latitude
111          * @param longitude DME2 longitude
112          * @param additionalProps additional properties to pass to DME2
113          * @param managed is this endpoind managed?
114          * @param useHttps does the connection use HTTPS?
115          * @param allowSelfSignedCerts does connection allow self-signed certificates?
116          * 
117          * @return an DMAAP Topic Source
118          * @throws IllegalArgumentException if invalid parameters are present
119          */
120         public DmaapTopicSource build(List<String> servers, 
121                                                                 String topic, 
122                                                                 String apiKey, 
123                                                                 String apiSecret, 
124                                                                 String userName, 
125                                                                 String password,
126                                                                 String consumerGroup, 
127                                                                 String consumerInstance,
128                                                                 int fetchTimeout,
129                                                                 int fetchLimit,
130                                                                 String environment,
131                                                                 String aftEnvironment,
132                                                                 String partner,
133                                                                 String latitude,
134                                                                 String longitude,
135                                                                 Map<String,String> additionalProps,
136                                                                 boolean managed,
137                                                                 boolean useHttps, 
138                                                                 boolean allowSelfSignedCerts)
139                         throws IllegalArgumentException;
140         
141         /**
142          * Instantiates a new DMAAP Topic Source
143          * 
144          * @param servers list of servers
145          * @param topic topic name
146          * @param apiKey API Key
147          * @param apiSecret API Secret
148          * 
149          * @return an DMAAP Topic Source
150          * @throws IllegalArgumentException if invalid parameters are present
151          */
152         public DmaapTopicSource build(List<String> servers, 
153                                                                 String topic, 
154                                                                 String apiKey, 
155                                                                 String apiSecret)
156                         throws IllegalArgumentException;
157
158         /**
159          * Instantiates a new DMAAP Topic Source
160          * 
161          * @param uebTopicReaderType Implementation type
162          * @param servers list of servers
163          * @param topic topic name
164          * 
165          * @return an DMAAP Topic Source
166          * @throws IllegalArgumentException if invalid parameters are present
167          */
168         public DmaapTopicSource build(List<String> servers,
169                                                                 String topic)
170                         throws IllegalArgumentException;        
171         
172         /**
173          * Destroys an DMAAP Topic Source based on a topic
174          * 
175          * @param topic topic name
176          * @throws IllegalArgumentException if invalid parameters are present
177          */
178         public void destroy(String topic);
179         
180         /**
181          * Destroys all DMAAP Topic Sources
182          */
183         public void destroy();
184         
185         /**
186          * gets an DMAAP Topic Source based on topic name
187          * @param topic the topic name
188          * @return an DMAAP Topic Source with topic name
189          * @throws IllegalArgumentException if an invalid topic is provided
190          * @throws IllegalStateException if the DMAAP Topic Source is 
191          * an incorrect state
192          */
193         public DmaapTopicSource get(String topic)
194                    throws IllegalArgumentException, IllegalStateException;
195         
196         /**
197          * Provides a snapshot of the DMAAP Topic Sources
198          * @return a list of the DMAAP Topic Sources
199          */
200         public List<DmaapTopicSource> inventory();
201 }
202
203
204 /* ------------- implementation ----------------- */
205
206 /**
207  * Factory of DMAAP Source Topics indexed by topic name
208  */
209
210 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
211         // get an instance of logger 
212         private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class);             
213         /**
214          * UEB Topic Name Index
215          */
216         protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
217                         new HashMap<String, DmaapTopicSource>();
218         
219         /**
220          * {@inheritDoc}
221          */
222         @Override
223         public DmaapTopicSource build(List<String> servers, 
224                                                                 String topic, 
225                                                                 String apiKey, 
226                                                                 String apiSecret, 
227                                                                 String userName, 
228                                                                 String password,
229                                                                 String consumerGroup, 
230                                                                 String consumerInstance,
231                                                                 int fetchTimeout,
232                                                                 int fetchLimit,
233                                                                 String environment,
234                                                                 String aftEnvironment,
235                                                                 String partner,
236                                                                 String latitude,
237                                                                 String longitude,
238                                                                 Map<String,String> additionalProps,
239                                                                 boolean managed,
240                                                                 boolean useHttps,
241                                                                 boolean allowSelfSignedCerts) 
242                         throws IllegalArgumentException {
243                 
244                 if (topic == null || topic.isEmpty()) {
245                         throw new IllegalArgumentException("A topic must be provided");
246                 }
247                 
248                 synchronized(this) {
249                         if (dmaapTopicSources.containsKey(topic)) {
250                                 return dmaapTopicSources.get(topic);
251                         }
252                         
253                         DmaapTopicSource dmaapTopicSource = 
254                                         new SingleThreadedDmaapTopicSource(servers, topic, 
255                                                                                                          apiKey, apiSecret, userName, password,
256                                                                                                          consumerGroup, consumerInstance, 
257                                                                                                          fetchTimeout, fetchLimit,
258                                                                                                          environment, aftEnvironment, partner,
259                                                                                                          latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
260                         
261                         if (managed)
262                                 dmaapTopicSources.put(topic, dmaapTopicSource);
263                         
264                         return dmaapTopicSource;
265                 }
266         }
267         /**
268          * {@inheritDoc}
269          */
270         @Override
271         public DmaapTopicSource build(List<String> servers, 
272                                                                 String topic, 
273                                                                 String apiKey, 
274                                                                 String apiSecret, 
275                                                                 String userName, 
276                                                                 String password,
277                                                                 String consumerGroup, 
278                                                                 String consumerInstance,
279                                                                 int fetchTimeout,
280                                                                 int fetchLimit,
281                                                                 boolean managed,
282                                                                 boolean useHttps,
283                                                                 boolean allowSelfSignedCerts) 
284                         throws IllegalArgumentException {
285                 
286                 if (servers == null || servers.isEmpty()) {
287                         throw new IllegalArgumentException("DMaaP Server(s) must be provided");
288                 }
289                 
290                 if (topic == null || topic.isEmpty()) {
291                         throw new IllegalArgumentException("A topic must be provided");
292                 }
293                 
294                 synchronized(this) {
295                         if (dmaapTopicSources.containsKey(topic)) {
296                                 return dmaapTopicSources.get(topic);
297                         }
298                         
299                         DmaapTopicSource dmaapTopicSource = 
300                                         new SingleThreadedDmaapTopicSource(servers, topic, 
301                                                                                                          apiKey, apiSecret, userName, password,
302                                                                                                          consumerGroup, consumerInstance, 
303                                                                                                          fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
304                         
305                         if (managed)
306                                 dmaapTopicSources.put(topic, dmaapTopicSource);
307                         
308                         return dmaapTopicSource;
309                 }
310         }
311         
312         /**
313          * {@inheritDoc}
314          */
315         @Override
316         public List<DmaapTopicSource> build(Properties properties) 
317                         throws IllegalArgumentException {
318                 
319                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
320                 if (readTopics == null || readTopics.isEmpty()) {
321                         logger.warn("No topic for UEB Source " + properties);
322                         return new ArrayList<DmaapTopicSource>();
323                 }
324                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
325                 
326                 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
327                 synchronized(this) {
328                         for (String topic: readTopicList) {
329                                 if (this.dmaapTopicSources.containsKey(topic)) {
330                                         dmaapTopicSource_s.add(this.dmaapTopicSources.get(topic));
331                                         continue;
332                                 }
333                                 
334                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
335                                                         topic + 
336                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
337                                 
338                                 List<String> serverList;
339                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
340                                 else serverList = new ArrayList<>();
341                                 
342                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
343                                                                                    "." + topic + 
344                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
345                                 
346                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
347                                                                                   "." + topic + 
348                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
349                                 
350                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
351                                                                                           "." + topic + 
352                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
353
354                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
355                                                                                           "." + topic + 
356                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
357                                 
358                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
359                                                                                       "." + topic + 
360                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
361                                 
362                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
363                                                                                          "." + topic + 
364                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
365                                 
366                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
367                                                                                            "." + topic + 
368                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
369                                 
370                                 /* DME2 Properties */
371                                 
372                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
373                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
374
375                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
376                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
377
378                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
379                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
380                                 
381                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
382                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
383
384                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
385                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
386
387                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
388                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
389
390                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
391                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
392
393                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
394                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
395
396                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
397                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
398
399                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
400                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
401
402                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
403                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
404
405                                 String dme2SessionStickinessRequired = properties
406                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
407                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
408                                 
409                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
410                                 
411                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
412                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
413                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
414                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
415                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
416                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
417                                 if (dme2Version != null && !dme2Version.isEmpty())
418                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
419                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
420                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
421                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
422                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
423                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
424                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
425                                 
426                                 
427                                 if (servers == null || servers.isEmpty()) {
428
429                                         logger.error("No DMaaP servers or DME2 ServiceName provided");
430                                         continue;
431                                 }
432                                 
433                                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
434                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
435                                         try {
436                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
437                                         } catch (NumberFormatException nfe) {
438                                                 logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
439                                         }
440                                 }
441                                         
442                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
443                                                                  "." + topic + 
444                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
445                                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
446                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
447                                         try {
448                                                 fetchLimit = Integer.parseInt(fetchLimitString);
449                                         } catch (NumberFormatException nfe) {
450                                                 logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
451                                         }
452                                 }
453                                 
454                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
455                                                                "." + topic + 
456                                                               PolicyProperties.PROPERTY_MANAGED_SUFFIX);
457                                 boolean managed = true;
458                                 if (managedString != null && !managedString.isEmpty()) {
459                                         managed = Boolean.parseBoolean(managedString);
460                                 }
461                                 
462                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
463                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
464
465                                         //default is to use HTTP if no https property exists
466                                 boolean useHttps = false;
467                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
468                                         useHttps = Boolean.parseBoolean(useHttpsString);
469                                 }
470                                 
471                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
472                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
473
474                                         //default is to disallow self-signed certs 
475                                 boolean allowSelfSignedCerts = false;
476                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
477                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
478                                 }                               
479                                 
480                                 
481                                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
482                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
483                                                                                                            consumerGroup, consumerInstance, 
484                                                                                                            fetchTimeout, fetchLimit, 
485                                                                                                            dme2Environment, dme2AftEnvironment, dme2Partner,
486                                                                                                            dme2Latitude, dme2Longitude, dme2AdditionalProps,
487                                                                                                            managed, useHttps, allowSelfSignedCerts);
488                                 
489                                 dmaapTopicSource_s.add(uebTopicSource);
490                         }
491                 }
492                 return dmaapTopicSource_s;
493         }
494         
495         /**
496          * {@inheritDoc}
497          * @throws IllegalArgumentException 
498          */
499         @Override
500         public DmaapTopicSource build(List<String> servers, 
501                                                                 String topic,
502                                                                 String apiKey, 
503                                                                 String apiSecret) throws IllegalArgumentException {
504                 return this.build(servers, topic, 
505                                                   apiKey, apiSecret, null, null,
506                                                   null, null,
507                                                   DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
508                                                   DmaapTopicSource.DEFAULT_LIMIT_FETCH,
509                                                   true,
510                                                   false,
511                                                   false);
512         }
513
514         /**
515          * {@inheritDoc}
516          * @throws IllegalArgumentException 
517          */
518         @Override
519         public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
520                 return this.build(servers, topic, null, null);
521         }       
522
523         /**
524          * {@inheritDoc}
525          */
526         @Override
527         public void destroy(String topic) 
528                    throws IllegalArgumentException {
529                 
530                 if (topic == null || topic.isEmpty()) {
531                         throw new IllegalArgumentException("A topic must be provided");
532                 }
533                 
534                 DmaapTopicSource uebTopicSource;
535                 
536                 synchronized(this) {
537                         if (!dmaapTopicSources.containsKey(topic)) {
538                                 return;
539                         }
540                         
541                         uebTopicSource = dmaapTopicSources.remove(topic);
542                 }
543                 
544                 uebTopicSource.shutdown();
545         }
546
547         /**
548          * {@inheritDoc}
549          */
550         @Override
551         public DmaapTopicSource get(String topic) 
552                throws IllegalArgumentException, IllegalStateException {
553                 
554                 if (topic == null || topic.isEmpty()) {
555                         throw new IllegalArgumentException("A topic must be provided");
556                 }
557                 
558                 synchronized(this) {
559                         if (dmaapTopicSources.containsKey(topic)) {
560                                 return dmaapTopicSources.get(topic);
561                         } else {
562                                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
563                         }
564                 }
565         }
566
567         @Override
568         public synchronized List<DmaapTopicSource> inventory() {
569                  List<DmaapTopicSource> readers = 
570                                  new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
571                  return readers;
572         }
573
574         @Override
575         public void destroy() {
576                 List<DmaapTopicSource> readers = this.inventory();
577                 for (DmaapTopicSource reader: readers) {
578                         reader.shutdown();
579                 }
580                 
581                 synchronized(this) {
582                         this.dmaapTopicSources.clear();
583                 }
584         }
585         
586 }
587