2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus;
 
  23 import java.util.ArrayList;
 
  24 import java.util.Arrays;
 
  25 import java.util.HashMap;
 
  26 import java.util.List;
 
  27 import java.util.Properties;
 
  29 import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
 
  30 import org.slf4j.LoggerFactory;
 
  31 import org.slf4j.Logger;
 
  32 import org.openecomp.policy.drools.properties.PolicyProperties;
 
  35  * UEB Topic Source Factory
 
  37 public interface UebTopicSourceFactory {
 
  40          * Creates an UEB Topic Source based on properties files
 
  42          * @param properties Properties containing initialization values
 
  44          * @return an UEB Topic Source
 
  45          * @throws IllegalArgumentException if invalid parameters are present
 
  47         public List<UebTopicSource> build(Properties properties)
 
  48                         throws IllegalArgumentException;
 
  51          * Instantiates a new UEB Topic Source
 
  53          * @param servers list of servers
 
  54          * @param topic topic name
 
  55          * @param apiKey API Key
 
  56          * @param apiSecret API Secret
 
  57          * @param consumerGroup Consumer Group
 
  58          * @param consumerInstance Consumer Instance
 
  59          * @param fetchTimeout Read Fetch Timeout
 
  60          * @param fetchLimit Fetch Limit
 
  61          * @param managed is this source endpoint managed?
 
  63          * @return an UEB Topic Source
 
  64          * @throws IllegalArgumentException if invalid parameters are present
 
  66         public UebTopicSource build(List<String> servers, 
 
  71                                                                 String consumerInstance,
 
  76                                                                 boolean allowSelfSignedCerts)
 
  77                         throws IllegalArgumentException;
 
  80          * Instantiates a new UEB Topic Source
 
  82          * @param servers list of servers
 
  83          * @param topic topic name
 
  84          * @param apiKey API Key
 
  85          * @param apiSecret API Secret
 
  87          * @return an UEB Topic Source
 
  88          * @throws IllegalArgumentException if invalid parameters are present
 
  90         public UebTopicSource build(List<String> servers, 
 
  94                         throws IllegalArgumentException;
 
  97          * Instantiates a new UEB Topic Source
 
  99          * @param uebTopicSourceType Implementation type
 
 100          * @param servers list of servers
 
 101          * @param topic topic name
 
 103          * @return an UEB Topic Source
 
 104          * @throws IllegalArgumentException if invalid parameters are present
 
 106         public UebTopicSource build(List<String> servers, 
 
 108                         throws IllegalArgumentException;        
 
 111          * Destroys an UEB Topic Source based on a topic
 
 113          * @param topic topic name
 
 114          * @throws IllegalArgumentException if invalid parameters are present
 
 116         public void destroy(String topic);
 
 119          * Destroys all UEB Topic Sources
 
 121         public void destroy();
 
 124          * gets an UEB Topic Source based on topic name
 
 125          * @param topic the topic name
 
 126          * @return an UEB Topic Source with topic name
 
 127          * @throws IllegalArgumentException if an invalid topic is provided
 
 128          * @throws IllegalStateException if the UEB Topic Source is 
 
 131         public UebTopicSource get(String topic)
 
 132                    throws IllegalArgumentException, IllegalStateException;
 
 135          * Provides a snapshot of the UEB Topic Sources
 
 136          * @return a list of the UEB Topic Sources
 
 138         public List<UebTopicSource> inventory();
 
 141 /* ------------- implementation ----------------- */
 
 144  * Factory of UEB Source Topics indexed by topic name
 
 146 class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
 
 150         private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSourceFactory.class);     
 
 153          * UEB Topic Name Index
 
 155         protected HashMap<String, UebTopicSource> uebTopicSources =
 
 156                         new HashMap<String, UebTopicSource>();
 
 162         public UebTopicSource build(List<String> servers, 
 
 166                                                                 String consumerGroup, 
 
 167                                                                 String consumerInstance,
 
 172                                                                 boolean allowSelfSignedCerts) 
 
 173         throws IllegalArgumentException {
 
 174                 if (servers == null || servers.isEmpty()) {
 
 175                         throw new IllegalArgumentException("UEB Server(s) must be provided");
 
 178                 if (topic == null || topic.isEmpty()) {
 
 179                         throw new IllegalArgumentException("A topic must be provided");
 
 183                         if (uebTopicSources.containsKey(topic)) {
 
 184                                 return uebTopicSources.get(topic);
 
 187                         UebTopicSource uebTopicSource = 
 
 188                                         new SingleThreadedUebTopicSource(servers, topic, 
 
 190                                                                                                          consumerGroup, consumerInstance, 
 
 191                                                                                                          fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
 
 194                                 uebTopicSources.put(topic, uebTopicSource);
 
 196                         return uebTopicSource;
 
 204         public List<UebTopicSource> build(Properties properties) 
 
 205                         throws IllegalArgumentException {
 
 207                 String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
 
 208                 if (readTopics == null || readTopics.isEmpty()) {
 
 209                         logger.info("{}: no topic for UEB Source", this);
 
 210                         return new ArrayList<UebTopicSource>();
 
 212                 List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));               
 
 214                 List<UebTopicSource> newUebTopicSources = new ArrayList<UebTopicSource>();
 
 216                         for (String topic: readTopicList) {
 
 217                                 if (this.uebTopicSources.containsKey(topic)) {
 
 218                                         newUebTopicSources.add(this.uebTopicSources.get(topic));
 
 222                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
 
 224                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
 
 226                                 if (servers == null || servers.isEmpty()) {
 
 227                                         logger.error("{}: no UEB servers configured for sink {}", this, topic);
 
 231                                 List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
 
 233                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 235                                                        PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
 
 237                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 239                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
 
 241                                 String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 243                                                               PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
 
 245                                 String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 247                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
 
 249                                 String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 251                                                                    PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
 
 252                                 int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
 
 253                                 if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
 
 255                                                 fetchTimeout = Integer.parseInt(fetchTimeoutString);
 
 256                                         } catch (NumberFormatException nfe) {
 
 257                                                 logger.warn("{}: fetch timeout {} is in invalid format for topic {} ", 
 
 258                                                                 this, fetchTimeoutString, topic);
 
 262                                 String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
 
 264                                                                  PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
 
 265                                 int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
 
 266                                 if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
 
 268                                                 fetchLimit = Integer.parseInt(fetchLimitString);
 
 269                                         } catch (NumberFormatException nfe) {
 
 270                                                 logger.warn("{}: fetch limit {} is in invalid format for topic {} ", 
 
 271                                                             this, fetchLimitString, topic);
 
 275                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
 
 276                                                                                       topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
 
 277                                 boolean managed = true;
 
 278                                 if (managedString != null && !managedString.isEmpty()) {
 
 279                                         managed = Boolean.parseBoolean(managedString);
 
 282                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
 
 283                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
 
 285                                         //default is to use HTTP if no https property exists
 
 286                                 boolean useHttps = false;
 
 287                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
 
 288                                         useHttps = Boolean.parseBoolean(useHttpsString);
 
 291                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic +
 
 292                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
 
 294                                         //default is to disallow self-signed certs 
 
 295                                 boolean allowSelfSignedCerts = false;
 
 296                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
 
 297                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
 
 300                                 UebTopicSource uebTopicSource = this.build(serverList, topic, 
 
 302                                                                                                            consumerGroup, consumerInstance, 
 
 303                                                                                                            fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts);
 
 304                                 newUebTopicSources.add(uebTopicSource);
 
 307                 return newUebTopicSources;
 
 314         public UebTopicSource build(List<String> servers, 
 
 319                 return this.build(servers, topic, 
 
 322                                                   UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
 
 323                                                   UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true);
 
 330         public UebTopicSource build(List<String> servers, String topic) {
 
 331                 return this.build(servers, topic, null, null);
 
 338         public void destroy(String topic) 
 
 339                    throws IllegalArgumentException {
 
 341                 if (topic == null || topic.isEmpty()) {
 
 342                         throw new IllegalArgumentException("A topic must be provided");
 
 345                 UebTopicSource uebTopicSource;
 
 348                         if (!uebTopicSources.containsKey(topic)) {
 
 352                         uebTopicSource = uebTopicSources.remove(topic);
 
 355                 uebTopicSource.shutdown();
 
 362         public UebTopicSource get(String topic) 
 
 363                throws IllegalArgumentException, IllegalStateException {
 
 365                 if (topic == null || topic.isEmpty()) {
 
 366                         throw new IllegalArgumentException("A topic must be provided");
 
 370                         if (uebTopicSources.containsKey(topic)) {
 
 371                                 return uebTopicSources.get(topic);
 
 373                                 throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
 
 379         public synchronized List<UebTopicSource> inventory() {
 
 380                  List<UebTopicSource> readers = 
 
 381                                  new ArrayList<UebTopicSource>(this.uebTopicSources.values());
 
 386         public void destroy() {
 
 387                 List<UebTopicSource> readers = this.inventory();
 
 388                 for (UebTopicSource reader: readers) {
 
 393                         this.uebTopicSources.clear();
 
 398         public String toString() {
 
 399                 StringBuilder builder = new StringBuilder();
 
 400                 builder.append("IndexedUebTopicSourceFactory []");
 
 401                 return builder.toString();