[POLICY-52] pdp-d: PolicyEngine junits
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / DmaapTopicSourceFactory.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus;
22
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.onap.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
31 import org.slf4j.LoggerFactory;
32 import org.slf4j.Logger;
33 import org.onap.policy.drools.properties.PolicyProperties;
34
35 /**
36  * DMAAP Topic Source Factory
37  */
38 public interface DmaapTopicSourceFactory {
39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
42         public final String DME2_VERSION_PROPERTY = "Version";
43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
47         
48         /**
49          * Creates an DMAAP Topic Source based on properties files
50          * 
51          * @param properties Properties containing initialization values
52          * 
53          * @return an DMAAP Topic Source
54          * @throws IllegalArgumentException if invalid parameters are present
55          */
56         public List<DmaapTopicSource> build(Properties properties)
57                         throws IllegalArgumentException;
58         
59         /**
60          * Instantiates a new DMAAP Topic Source
61          * 
62          * @param servers list of servers
63          * @param topic topic name
64          * @param apiKey API Key
65          * @param apiSecret API Secret
66          * @param userName user name
67          * @param password password
68          * @param consumerGroup Consumer Group
69          * @param consumerInstance Consumer Instance
70          * @param fetchTimeout Read Fetch Timeout
71          * @param fetchLimit Fetch Limit
72          * @param managed is this endpoind managed?
73          * @param useHttps does the connection use HTTPS?
74          * @param allowSelfSignedCerts does connection allow self-signed certificates?
75          * 
76          * @return an DMAAP Topic Source
77          * @throws IllegalArgumentException if invalid parameters are present
78          */
79         public DmaapTopicSource build(List<String> servers, 
80                                                                 String topic, 
81                                                                 String apiKey, 
82                                                                 String apiSecret, 
83                                                                 String userName, 
84                                                                 String password,
85                                                                 String consumerGroup, 
86                                                                 String consumerInstance,
87                                                                 int fetchTimeout,
88                                                                 int fetchLimit,
89                                                                 boolean managed,
90                                                                 boolean useHttps,
91                                                                 boolean allowSelfSignedCerts)
92                         throws IllegalArgumentException;
93         
94         /**
95          * Instantiates a new DMAAP Topic Source
96          * 
97          * @param servers list of servers
98          * @param topic topic name
99          * @param apiKey API Key
100          * @param apiSecret API Secret
101          * @param userName user name
102          * @param password password
103          * @param consumerGroup Consumer Group
104          * @param consumerInstance Consumer Instance
105          * @param fetchTimeout Read Fetch Timeout
106          * @param fetchLimit Fetch Limit
107          * @param environment DME2 environment
108          * @param aftEnvironment DME2 AFT environment
109          * @param partner DME2 Partner
110          * @param latitude DME2 latitude
111          * @param longitude DME2 longitude
112          * @param additionalProps additional properties to pass to DME2
113          * @param managed is this endpoind managed?
114          * @param useHttps does the connection use HTTPS?
115          * @param allowSelfSignedCerts does connection allow self-signed certificates?
116          * 
117          * @return an DMAAP Topic Source
118          * @throws IllegalArgumentException if invalid parameters are present
119          */
120         public DmaapTopicSource build(List<String> servers, 
121                                                                 String topic, 
122                                                                 String apiKey, 
123                                                                 String apiSecret, 
124                                                                 String userName, 
125                                                                 String password,
126                                                                 String consumerGroup, 
127                                                                 String consumerInstance,
128                                                                 int fetchTimeout,
129                                                                 int fetchLimit,
130                                                                 String environment,
131                                                                 String aftEnvironment,
132                                                                 String partner,
133                                                                 String latitude,
134                                                                 String longitude,
135                                                                 Map<String,String> additionalProps,
136                                                                 boolean managed,
137                                                                 boolean useHttps, 
138                                                                 boolean allowSelfSignedCerts)
139                         throws IllegalArgumentException;
140         
141         /**
142          * Instantiates a new DMAAP Topic Source
143          * 
144          * @param servers list of servers
145          * @param topic topic name
146          * @param apiKey API Key
147          * @param apiSecret API Secret
148          * 
149          * @return an DMAAP Topic Source
150          * @throws IllegalArgumentException if invalid parameters are present
151          */
152         public DmaapTopicSource build(List<String> servers, 
153                                                                 String topic, 
154                                                                 String apiKey, 
155                                                                 String apiSecret)
156                         throws IllegalArgumentException;
157
158         /**
159          * Instantiates a new DMAAP Topic Source
160          * 
161          * @param uebTopicReaderType Implementation type
162          * @param servers list of servers
163          * @param topic topic name
164          * 
165          * @return an DMAAP Topic Source
166          * @throws IllegalArgumentException if invalid parameters are present
167          */
168         public DmaapTopicSource build(List<String> servers,
169                                                                 String topic)
170                         throws IllegalArgumentException;        
171         
172         /**
173          * Destroys an DMAAP Topic Source based on a topic
174          * 
175          * @param topic topic name
176          * @throws IllegalArgumentException if invalid parameters are present
177          */
178         public void destroy(String topic);
179         
180         /**
181          * Destroys all DMAAP Topic Sources
182          */
183         public void destroy();
184         
185         /**
186          * gets an DMAAP Topic Source based on topic name
187          * @param topic the topic name
188          * @return an DMAAP Topic Source with topic name
189          * @throws IllegalArgumentException if an invalid topic is provided
190          * @throws IllegalStateException if the DMAAP Topic Source is 
191          * an incorrect state
192          */
193         public DmaapTopicSource get(String topic)
194                    throws IllegalArgumentException, IllegalStateException;
195         
196         /**
197          * Provides a snapshot of the DMAAP Topic Sources
198          * @return a list of the DMAAP Topic Sources
199          */
200         public List<DmaapTopicSource> inventory();
201 }
202
203
204 /* ------------- implementation ----------------- */
205
206 /**
207  * Factory of DMAAP Source Topics indexed by topic name
208  */
209
210 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
211         /**
212          * Logger 
213          */
214         private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);   
215         
216         /**
217          * DMaaP Topic Name Index
218          */
219         protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
220                         new HashMap<String, DmaapTopicSource>();
221         
222         /**
223          * {@inheritDoc}
224          */
225         @Override
226         public DmaapTopicSource build(List<String> servers, 
227                                                                 String topic, 
228                                                                 String apiKey, 
229                                                                 String apiSecret, 
230                                                                 String userName, 
231                                                                 String password,
232                                                                 String consumerGroup, 
233                                                                 String consumerInstance,
234                                                                 int fetchTimeout,
235                                                                 int fetchLimit,
236                                                                 String environment,
237                                                                 String aftEnvironment,
238                                                                 String partner,
239                                                                 String latitude,
240                                                                 String longitude,
241                                                                 Map<String,String> additionalProps,
242                                                                 boolean managed,
243                                                                 boolean useHttps,
244                                                                 boolean allowSelfSignedCerts) 
245                         throws IllegalArgumentException {
246                 
247                 if (topic == null || topic.isEmpty()) {
248                         throw new IllegalArgumentException("A topic must be provided");
249                 }
250                 
251                 synchronized(this) {
252                         if (dmaapTopicSources.containsKey(topic)) {
253                                 return dmaapTopicSources.get(topic);
254                         }
255                         
256                         DmaapTopicSource dmaapTopicSource = 
257                                         new SingleThreadedDmaapTopicSource(servers, topic, 
258                                                                                                          apiKey, apiSecret, userName, password,
259                                                                                                          consumerGroup, consumerInstance, 
260                                                                                                          fetchTimeout, fetchLimit,
261                                                                                                          environment, aftEnvironment, partner,
262                                                                                                          latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
263                         
264                         if (managed)
265                                 dmaapTopicSources.put(topic, dmaapTopicSource);
266                         
267                         return dmaapTopicSource;
268                 }
269         }
270         /**
271          * {@inheritDoc}
272          */
273         @Override
274         public DmaapTopicSource build(List<String> servers, 
275                                                                 String topic, 
276                                                                 String apiKey, 
277                                                                 String apiSecret, 
278                                                                 String userName, 
279                                                                 String password,
280                                                                 String consumerGroup, 
281                                                                 String consumerInstance,
282                                                                 int fetchTimeout,
283                                                                 int fetchLimit,
284                                                                 boolean managed,
285                                                                 boolean useHttps,
286                                                                 boolean allowSelfSignedCerts) 
287                         throws IllegalArgumentException {
288                 
289                 if (servers == null || servers.isEmpty()) {
290                         throw new IllegalArgumentException("DMaaP Server(s) must be provided");
291                 }
292                 
293                 if (topic == null || topic.isEmpty()) {
294                         throw new IllegalArgumentException("A topic must be provided");
295                 }
296                 
297                 synchronized(this) {
298                         if (dmaapTopicSources.containsKey(topic)) {
299                                 return dmaapTopicSources.get(topic);
300                         }
301                         
302                         DmaapTopicSource dmaapTopicSource = 
303                                         new SingleThreadedDmaapTopicSource(servers, topic, 
304                                                                                                          apiKey, apiSecret, userName, password,
305                                                                                                          consumerGroup, consumerInstance, 
306                                                                                                          fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
307                         
308                         if (managed)
309                                 dmaapTopicSources.put(topic, dmaapTopicSource);
310                         
311                         return dmaapTopicSource;
312                 }
313         }
314         
315         /**
316          * {@inheritDoc}
317          */
318         @Override
319         public List<DmaapTopicSource> build(Properties properties) 
320                         throws IllegalArgumentException {
321                 
322                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
323                 if (readTopics == null || readTopics.isEmpty()) {
324                         logger.info("{}: no topic for DMaaP Source", this);
325                         return new ArrayList<DmaapTopicSource>();
326                 }
327                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
328                 
329                 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
330                 synchronized(this) {
331                         for (String topic: readTopicList) {
332                                 if (this.dmaapTopicSources.containsKey(topic)) {
333                                         dmaapTopicSource_s.add(this.dmaapTopicSources.get(topic));
334                                         continue;
335                                 }
336                                 
337                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
338                                                         topic + 
339                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
340                                 
341                                 List<String> serverList;
342                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
343                                 else serverList = new ArrayList<>();
344                                 
345                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
346                                                                                    "." + topic + 
347                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
348                                 
349                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
350                                                                                   "." + topic + 
351                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
352                                 
353                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
354                                                                                           "." + topic + 
355                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
356
357                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
358                                                                                           "." + topic + 
359                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
360                                 
361                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
362                                                                                       "." + topic + 
363                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
364                                 
365                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
366                                                                                          "." + topic + 
367                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
368                                 
369                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
370                                                                                            "." + topic + 
371                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
372                                 
373                                 /* DME2 Properties */
374                                 
375                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
376                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
377
378                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
379                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
380
381                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
382                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
383                                 
384                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
385                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
386
387                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
388                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
389
390                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
391                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
392
393                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
394                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
395
396                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
397                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
398
399                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
400                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
401
402                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
403                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
404
405                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
406                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
407
408                                 String dme2SessionStickinessRequired = properties
409                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
410                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
411                                 
412                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
413                                 
414                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
415                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
416                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
417                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
418                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
419                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
420                                 if (dme2Version != null && !dme2Version.isEmpty())
421                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
422                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
423                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
424                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
425                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
426                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
427                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
428                                 
429                                 
430                                 if (servers == null || servers.isEmpty()) {
431
432                                         logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
433                                         continue;
434                                 }
435                                 
436                                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
437                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
438                                         try {
439                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
440                                         } catch (NumberFormatException nfe) {
441                                                 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", 
442                                                                     this, fetchTimeoutString, topic);
443                                         }
444                                 }
445                                         
446                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
447                                                                  "." + topic + 
448                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
449                                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
450                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
451                                         try {
452                                                 fetchLimit = Integer.parseInt(fetchLimitString);
453                                         } catch (NumberFormatException nfe) {
454                                                 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", 
455                                                                 this, fetchLimitString, topic);
456                                         }
457                                 }
458                                 
459                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
460                                                                "." + topic + 
461                                                               PolicyProperties.PROPERTY_MANAGED_SUFFIX);
462                                 boolean managed = true;
463                                 if (managedString != null && !managedString.isEmpty()) {
464                                         managed = Boolean.parseBoolean(managedString);
465                                 }
466                                 
467                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
468                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
469
470                                         //default is to use HTTP if no https property exists
471                                 boolean useHttps = false;
472                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
473                                         useHttps = Boolean.parseBoolean(useHttpsString);
474                                 }
475                                 
476                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
477                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
478
479                                         //default is to disallow self-signed certs 
480                                 boolean allowSelfSignedCerts = false;
481                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
482                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
483                                 }                               
484                                 
485                                 
486                                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
487                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
488                                                                                                            consumerGroup, consumerInstance, 
489                                                                                                            fetchTimeout, fetchLimit, 
490                                                                                                            dme2Environment, dme2AftEnvironment, dme2Partner,
491                                                                                                            dme2Latitude, dme2Longitude, dme2AdditionalProps,
492                                                                                                            managed, useHttps, allowSelfSignedCerts);
493                                 
494                                 dmaapTopicSource_s.add(uebTopicSource);
495                         }
496                 }
497                 return dmaapTopicSource_s;
498         }
499         
500         /**
501          * {@inheritDoc}
502          * @throws IllegalArgumentException 
503          */
504         @Override
505         public DmaapTopicSource build(List<String> servers, 
506                                                                 String topic,
507                                                                 String apiKey, 
508                                                                 String apiSecret) throws IllegalArgumentException {
509                 return this.build(servers, topic, 
510                                                   apiKey, apiSecret, null, null,
511                                                   null, null,
512                                                   DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
513                                                   DmaapTopicSource.DEFAULT_LIMIT_FETCH,
514                                                   true,
515                                                   false,
516                                                   false);
517         }
518
519         /**
520          * {@inheritDoc}
521          * @throws IllegalArgumentException 
522          */
523         @Override
524         public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
525                 return this.build(servers, topic, null, null);
526         }       
527
528         /**
529          * {@inheritDoc}
530          */
531         @Override
532         public void destroy(String topic) 
533                    throws IllegalArgumentException {
534                 
535                 if (topic == null || topic.isEmpty()) {
536                         throw new IllegalArgumentException("A topic must be provided");
537                 }
538                 
539                 DmaapTopicSource uebTopicSource;
540                 
541                 synchronized(this) {
542                         if (!dmaapTopicSources.containsKey(topic)) {
543                                 return;
544                         }
545                         
546                         uebTopicSource = dmaapTopicSources.remove(topic);
547                 }
548                 
549                 uebTopicSource.shutdown();
550         }
551
552         /**
553          * {@inheritDoc}
554          */
555         @Override
556         public DmaapTopicSource get(String topic) 
557                throws IllegalArgumentException, IllegalStateException {
558                 
559                 if (topic == null || topic.isEmpty()) {
560                         throw new IllegalArgumentException("A topic must be provided");
561                 }
562                 
563                 synchronized(this) {
564                         if (dmaapTopicSources.containsKey(topic)) {
565                                 return dmaapTopicSources.get(topic);
566                         } else {
567                                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
568                         }
569                 }
570         }
571
572         @Override
573         public synchronized List<DmaapTopicSource> inventory() {
574                  List<DmaapTopicSource> readers = 
575                                  new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
576                  return readers;
577         }
578
579         @Override
580         public void destroy() {
581                 List<DmaapTopicSource> readers = this.inventory();
582                 for (DmaapTopicSource reader: readers) {
583                         reader.shutdown();
584                 }
585                 
586                 synchronized(this) {
587                         this.dmaapTopicSources.clear();
588                 }
589         }
590         @Override
591         public String toString() {
592                 StringBuilder builder = new StringBuilder();
593                 builder.append("IndexedDmaapTopicSourceFactory []");
594                 return builder.toString();
595         }
596         
597 }
598