2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus;
 
  23 import java.util.ArrayList;
 
  24 import java.util.Arrays;
 
  25 import java.util.HashMap;
 
  26 import java.util.List;
 
  28 import java.util.Properties;
 
  30 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
 
  31 import org.slf4j.LoggerFactory;
 
  32 import org.slf4j.Logger;
 
  33 import org.openecomp.policy.drools.properties.PolicyProperties;
 
  36  * DMAAP Topic Source Factory
 
  38 public interface DmaapTopicSourceFactory {
 
  39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
 
  40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
 
  41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
 
  42         public final String DME2_VERSION_PROPERTY = "Version";
 
  43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
 
  44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
 
  45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
 
  46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
 
  49          * Creates an DMAAP Topic Source based on properties files
 
  51          * @param properties Properties containing initialization values
 
  53          * @return an DMAAP Topic Source
 
  54          * @throws IllegalArgumentException if invalid parameters are present
 
  56         public List<DmaapTopicSource> build(Properties properties)
 
  57                         throws IllegalArgumentException;
 
  60          * Instantiates a new DMAAP Topic Source
 
  62          * @param servers list of servers
 
  63          * @param topic topic name
 
  64          * @param apiKey API Key
 
  65          * @param apiSecret API Secret
 
  66          * @param userName user name
 
  67          * @param password password
 
  68          * @param consumerGroup Consumer Group
 
  69          * @param consumerInstance Consumer Instance
 
  70          * @param fetchTimeout Read Fetch Timeout
 
  71          * @param fetchLimit Fetch Limit
 
  72          * @param managed is this endpoind managed?
 
  73          * @param useHttps does the connection use HTTPS?
 
  74          * @param allowSelfSignedCerts does connection allow self-signed certificates?
 
  76          * @return an DMAAP Topic Source
 
  77          * @throws IllegalArgumentException if invalid parameters are present
 
  79         public DmaapTopicSource build(List<String> servers, 
 
  86                                                                 String consumerInstance,
 
  91                                                                 boolean allowSelfSignedCerts)
 
  92                         throws IllegalArgumentException;
 
  95          * Instantiates a new DMAAP Topic Source
 
  97          * @param servers list of servers
 
  98          * @param topic topic name
 
  99          * @param apiKey API Key
 
 100          * @param apiSecret API Secret
 
 101          * @param userName user name
 
 102          * @param password password
 
 103          * @param consumerGroup Consumer Group
 
 104          * @param consumerInstance Consumer Instance
 
 105          * @param fetchTimeout Read Fetch Timeout
 
 106          * @param fetchLimit Fetch Limit
 
 107          * @param environment DME2 environment
 
 108          * @param aftEnvironment DME2 AFT environment
 
 109          * @param partner DME2 Partner
 
 110          * @param latitude DME2 latitude
 
 111          * @param longitude DME2 longitude
 
 112          * @param additionalProps additional properties to pass to DME2
 
 113          * @param managed is this endpoind managed?
 
 114          * @param useHttps does the connection use HTTPS?
 
 115          * @param allowSelfSignedCerts does connection allow self-signed certificates?
 
 117          * @return an DMAAP Topic Source
 
 118          * @throws IllegalArgumentException if invalid parameters are present
 
 120         public DmaapTopicSource build(List<String> servers, 
 
 126                                                                 String consumerGroup, 
 
 127                                                                 String consumerInstance,
 
 131                                                                 String aftEnvironment,
 
 135                                                                 Map<String,String> additionalProps,
 
 138                                                                 boolean allowSelfSignedCerts)
 
 139                         throws IllegalArgumentException;
 
 142          * Instantiates a new DMAAP Topic Source
 
 144          * @param servers list of servers
 
 145          * @param topic topic name
 
 146          * @param apiKey API Key
 
 147          * @param apiSecret API Secret
 
 149          * @return an DMAAP Topic Source
 
 150          * @throws IllegalArgumentException if invalid parameters are present
 
 152         public DmaapTopicSource build(List<String> servers, 
 
 156                         throws IllegalArgumentException;
 
 159          * Instantiates a new DMAAP Topic Source
 
 161          * @param uebTopicReaderType Implementation type
 
 162          * @param servers list of servers
 
 163          * @param topic topic name
 
 165          * @return an DMAAP Topic Source
 
 166          * @throws IllegalArgumentException if invalid parameters are present
 
 168         public DmaapTopicSource build(List<String> servers,
 
 170                         throws IllegalArgumentException;        
 
 173          * Destroys an DMAAP Topic Source based on a topic
 
 175          * @param topic topic name
 
 176          * @throws IllegalArgumentException if invalid parameters are present
 
 178         public void destroy(String topic);
 
 181          * Destroys all DMAAP Topic Sources
 
 183         public void destroy();
 
 186          * gets an DMAAP Topic Source based on topic name
 
 187          * @param topic the topic name
 
 188          * @return an DMAAP Topic Source with topic name
 
 189          * @throws IllegalArgumentException if an invalid topic is provided
 
 190          * @throws IllegalStateException if the DMAAP Topic Source is 
 
 193         public DmaapTopicSource get(String topic)
 
 194                    throws IllegalArgumentException, IllegalStateException;
 
 197          * Provides a snapshot of the DMAAP Topic Sources
 
 198          * @return a list of the DMAAP Topic Sources
 
 200         public List<DmaapTopicSource> inventory();
 
 204 /* ------------- implementation ----------------- */
 
 207  * Factory of DMAAP Source Topics indexed by topic name
 
 210 class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
 
 214         private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSourceFactory.class);   
 
 217          * DMaaP Topic Name Index
 
 219         protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
 
 220                         new HashMap<String, DmaapTopicSource>();
 
 226         public DmaapTopicSource build(List<String> servers, 
 
 232                                                                 String consumerGroup, 
 
 233                                                                 String consumerInstance,
 
 237                                                                 String aftEnvironment,
 
 241                                                                 Map<String,String> additionalProps,
 
 244                                                                 boolean allowSelfSignedCerts) 
 
 245                         throws IllegalArgumentException {
 
 247                 if (topic == null || topic.isEmpty()) {
 
 248                         throw new IllegalArgumentException("A topic must be provided");
 
 252                         if (dmaapTopicSources.containsKey(topic)) {
 
 253                                 return dmaapTopicSources.get(topic);
 
 256                         DmaapTopicSource dmaapTopicSource = 
 
 257                                         new SingleThreadedDmaapTopicSource(servers, topic, 
 
 258                                                                                                          apiKey, apiSecret, userName, password,
 
 259                                                                                                          consumerGroup, consumerInstance, 
 
 260                                                                                                          fetchTimeout, fetchLimit,
 
 261                                                                                                          environment, aftEnvironment, partner,
 
 262                                                                                                          latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
 
 265                                 dmaapTopicSources.put(topic, dmaapTopicSource);
 
 267                         return dmaapTopicSource;
 
 274         public DmaapTopicSource build(List<String> servers, 
 
 280                                                                 String consumerGroup, 
 
 281                                                                 String consumerInstance,
 
 286                                                                 boolean allowSelfSignedCerts) 
 
 287                         throws IllegalArgumentException {
 
 289                 if (servers == null || servers.isEmpty()) {
 
 290                         throw new IllegalArgumentException("DMaaP Server(s) must be provided");
 
 293                 if (topic == null || topic.isEmpty()) {
 
 294                         throw new IllegalArgumentException("A topic must be provided");
 
 298                         if (dmaapTopicSources.containsKey(topic)) {
 
 299                                 return dmaapTopicSources.get(topic);
 
 302                         DmaapTopicSource dmaapTopicSource = 
 
 303                                         new SingleThreadedDmaapTopicSource(servers, topic, 
 
 304                                                                                                          apiKey, apiSecret, userName, password,
 
 305                                                                                                          consumerGroup, consumerInstance, 
 
 306                                                                                                          fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
 
 309                                 dmaapTopicSources.put(topic, dmaapTopicSource);
 
 311                         return dmaapTopicSource;
 
 319         public List<DmaapTopicSource> build(Properties properties) 
 
 320                         throws IllegalArgumentException {
 
 322                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
 
 323                 if (readTopics == null || readTopics.isEmpty()) {
 
 324                         logger.info("{}: no topic for DMaaP Source", this);
 
 325                         return new ArrayList<DmaapTopicSource>();
 
 327                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
 
 329                 List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
 
 331                         for (String topic: readTopicList) {
 
 332                                 if (this.dmaapTopicSources.containsKey(topic)) {
 
 333                                         dmaapTopicSource_s.add(this.dmaapTopicSources.get(topic));
 
 337                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
 
 339                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
 
 341                                 List<String> serverList;
 
 342                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
 
 343                                 else serverList = new ArrayList<>();
 
 345                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 347                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
 
 349                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 351                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
 
 353                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 355                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
 
 357                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 359                                                                                           PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
 
 361                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 363                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
 
 365                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 367                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
 
 369                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 371                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
 
 373                                 /* DME2 Properties */
 
 375                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 376                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
 
 378                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 379                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
 
 381                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
 
 382                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
 
 384                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
 
 385                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
 
 387                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
 
 388                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
 
 390                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 391                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
 
 393                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 394                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
 
 396                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 397                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
 
 399                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS
 
 400                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
 
 402                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
 
 403                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
 
 405                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
 
 406                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
 
 408                                 String dme2SessionStickinessRequired = properties
 
 409                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
 
 410                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
 
 412                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
 
 414                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
 
 415                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
 
 416                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
 
 417                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
 
 418                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
 
 419                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
 
 420                                 if (dme2Version != null && !dme2Version.isEmpty())
 
 421                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
 
 422                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
 
 423                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
 
 424                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
 
 425                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
 
 426                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
 
 427                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
 
 430                                 if (servers == null || servers.isEmpty()) {
 
 432                                         logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
 
 436                                 int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
 
 437                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
 
 439                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
 
 440                                         } catch (NumberFormatException nfe) {
 
 441                                                 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", 
 
 442                                                                     this, fetchTimeoutString, topic);
 
 446                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 448                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
 
 449                                 int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
 
 450                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
 
 452                                                 fetchLimit = Integer.parseInt(fetchLimitString);
 
 453                                         } catch (NumberFormatException nfe) {
 
 454                                                 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", 
 
 455                                                                 this, fetchLimitString, topic);
 
 459                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
 
 461                                                               PolicyProperties.PROPERTY_MANAGED_SUFFIX);
 
 462                                 boolean managed = true;
 
 463                                 if (managedString != null && !managedString.isEmpty()) {
 
 464                                         managed = Boolean.parseBoolean(managedString);
 
 467                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
 
 468                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
 
 470                                         //default is to use HTTP if no https property exists
 
 471                                 boolean useHttps = false;
 
 472                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
 
 473                                         useHttps = Boolean.parseBoolean(useHttpsString);
 
 476                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic +
 
 477                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
 
 479                                         //default is to disallow self-signed certs 
 
 480                                 boolean allowSelfSignedCerts = false;
 
 481                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
 
 482                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
 
 486                                 DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
 
 487                                                                                                            apiKey, apiSecret, aafMechId, aafPassword,
 
 488                                                                                                            consumerGroup, consumerInstance, 
 
 489                                                                                                            fetchTimeout, fetchLimit, 
 
 490                                                                                                            dme2Environment, dme2AftEnvironment, dme2Partner,
 
 491                                                                                                            dme2Latitude, dme2Longitude, dme2AdditionalProps,
 
 492                                                                                                            managed, useHttps, allowSelfSignedCerts);
 
 494                                 dmaapTopicSource_s.add(uebTopicSource);
 
 497                 return dmaapTopicSource_s;
 
 502          * @throws IllegalArgumentException 
 
 505         public DmaapTopicSource build(List<String> servers, 
 
 508                                                                 String apiSecret) throws IllegalArgumentException {
 
 509                 return this.build(servers, topic, 
 
 510                                                   apiKey, apiSecret, null, null,
 
 512                                                   DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
 
 513                                                   DmaapTopicSource.DEFAULT_LIMIT_FETCH,
 
 521          * @throws IllegalArgumentException 
 
 524         public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException {
 
 525                 return this.build(servers, topic, null, null);
 
 532         public void destroy(String topic) 
 
 533                    throws IllegalArgumentException {
 
 535                 if (topic == null || topic.isEmpty()) {
 
 536                         throw new IllegalArgumentException("A topic must be provided");
 
 539                 DmaapTopicSource uebTopicSource;
 
 542                         if (!dmaapTopicSources.containsKey(topic)) {
 
 546                         uebTopicSource = dmaapTopicSources.remove(topic);
 
 549                 uebTopicSource.shutdown();
 
 556         public DmaapTopicSource get(String topic) 
 
 557                throws IllegalArgumentException, IllegalStateException {
 
 559                 if (topic == null || topic.isEmpty()) {
 
 560                         throw new IllegalArgumentException("A topic must be provided");
 
 564                         if (dmaapTopicSources.containsKey(topic)) {
 
 565                                 return dmaapTopicSources.get(topic);
 
 567                                 throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found");
 
 573         public synchronized List<DmaapTopicSource> inventory() {
 
 574                  List<DmaapTopicSource> readers = 
 
 575                                  new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
 
 580         public void destroy() {
 
 581                 List<DmaapTopicSource> readers = this.inventory();
 
 582                 for (DmaapTopicSource reader: readers) {
 
 587                         this.dmaapTopicSources.clear();
 
 591         public String toString() {
 
 592                 StringBuilder builder = new StringBuilder();
 
 593                 builder.append("IndexedDmaapTopicSourceFactory []");
 
 594                 return builder.toString();