2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus;
 
  23 import java.util.ArrayList;
 
  24 import java.util.Arrays;
 
  25 import java.util.HashMap;
 
  26 import java.util.List;
 
  28 import java.util.Properties;
 
  30 import org.slf4j.LoggerFactory;
 
  31 import org.slf4j.Logger;
 
  32 import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
 
  33 import org.openecomp.policy.drools.properties.PolicyProperties;
 
  36  * DMAAP Topic Sink Factory
 
  38 public interface DmaapTopicSinkFactory {
 
  39         public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS";
 
  40         public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT";
 
  41         public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS";
 
  42         public final String DME2_VERSION_PROPERTY = "Version";
 
  43         public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer";
 
  44         public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName";
 
  45         public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath";
 
  46         public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired";
 
  49          * Instantiates a new DMAAP Topic Sink
 
  51          * @param servers list of servers
 
  52          * @param topic topic name
 
  53          * @param apiKey API Key
 
  54          * @param apiSecret API Secret
 
  55          * @param userName AAF user name
 
  56          * @param password AAF password
 
  57          * @param partitionKey Consumer Group
 
  58          * @param environment DME2 environment
 
  59          * @param aftEnvironment DME2 AFT environment
 
  60          * @param partner DME2 Partner
 
  61          * @param latitude DME2 latitude
 
  62          * @param longitude DME2 longitude
 
  63          * @param additionalProps additional properties to pass to DME2
 
  64          * @param managed is this sink endpoint managed?
 
  66          * @return an DMAAP Topic Sink
 
  67          * @throws IllegalArgumentException if invalid parameters are present
 
  69         public DmaapTopicSink build(List<String> servers, 
 
  77                                                                 String aftEnvironment,
 
  81                                                                 Map<String,String> additionalProps,
 
  84                                                                 boolean allowSelfSignedCerts) ;
 
  87          * Instantiates a new DMAAP Topic Sink
 
  89          * @param servers list of servers
 
  90          * @param topic topic name
 
  91          * @param apiKey API Key
 
  92          * @param apiSecret API Secret
 
  93          * @param userName AAF user name
 
  94          * @param password AAF password
 
  95          * @param partitionKey Consumer Group
 
  96          * @param managed is this sink endpoint managed?
 
  98          * @return an DMAAP Topic Sink
 
  99          * @throws IllegalArgumentException if invalid parameters are present
 
 101         public DmaapTopicSink build(List<String> servers, 
 
 110                                                                 boolean allowSelfSignedCerts)
 
 111                         throws IllegalArgumentException;
 
 114          * Creates an DMAAP Topic Sink based on properties files
 
 116          * @param properties Properties containing initialization values
 
 118          * @return an DMAAP Topic Sink
 
 119          * @throws IllegalArgumentException if invalid parameters are present
 
 121         public List<DmaapTopicSink> build(Properties properties)
 
 122                         throws IllegalArgumentException;
 
 125          * Instantiates a new DMAAP Topic Sink
 
 127          * @param servers list of servers
 
 128          * @param topic topic name
 
 130          * @return an DMAAP Topic Sink
 
 131          * @throws IllegalArgumentException if invalid parameters are present
 
 133         public DmaapTopicSink build(List<String> servers, String topic)
 
 134                         throws IllegalArgumentException;
 
 137          * Destroys an DMAAP Topic Sink based on a topic
 
 139          * @param topic topic name
 
 140          * @throws IllegalArgumentException if invalid parameters are present
 
 142         public void destroy(String topic);
 
 145          * gets an DMAAP Topic Sink based on topic name
 
 146          * @param topic the topic name
 
 148          * @return an DMAAP Topic Sink with topic name
 
 149          * @throws IllegalArgumentException if an invalid topic is provided
 
 150          * @throws IllegalStateException if the DMAAP Topic Reader is 
 
 153         public DmaapTopicSink get(String topic)
 
 154                            throws IllegalArgumentException, IllegalStateException;
 
 157          * Provides a snapshot of the DMAAP Topic Sinks
 
 158          * @return a list of the DMAAP Topic Sinks
 
 160         public List<DmaapTopicSink> inventory();
 
 163          * Destroys all DMAAP Topic Sinks
 
 165         public void destroy();
 
 168 /* ------------- implementation ----------------- */
 
 171  * Factory of DMAAP Reader Topics indexed by topic name
 
 173 class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
 
 177         private static Logger logger = LoggerFactory.getLogger(IndexedDmaapTopicSinkFactory.class);     
 
 180          * DMAAP Topic Name Index
 
 182         protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
 
 183                         new HashMap<String, DmaapTopicSink>();
 
 189         public DmaapTopicSink build(List<String> servers, 
 
 197                                                                 String aftEnvironment,
 
 201                                                                 Map<String,String> additionalProps,
 
 204                                                                 boolean allowSelfSignedCerts) 
 
 205                         throws IllegalArgumentException {
 
 207                 if (topic == null || topic.isEmpty()) {
 
 208                         throw new IllegalArgumentException("A topic must be provided");
 
 211                 synchronized (this) {
 
 212                         if (dmaapTopicWriters.containsKey(topic)) {
 
 213                                 return dmaapTopicWriters.get(topic);
 
 216                         DmaapTopicSink dmaapTopicSink = 
 
 217                                         new InlineDmaapTopicSink(servers, topic, 
 
 221                                                                                      environment, aftEnvironment, 
 
 222                                                                                      partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts);
 
 225                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
 
 226                         return dmaapTopicSink;
 
 234         public DmaapTopicSink build(List<String> servers, 
 
 242                                                                 boolean useHttps, boolean allowSelfSignedCerts) 
 
 243                         throws IllegalArgumentException {
 
 245                 if (topic == null || topic.isEmpty()) {
 
 246                         throw new IllegalArgumentException("A topic must be provided");
 
 249                 synchronized (this) {
 
 250                         if (dmaapTopicWriters.containsKey(topic)) {
 
 251                                 return dmaapTopicWriters.get(topic);
 
 254                         DmaapTopicSink dmaapTopicSink = 
 
 255                                         new InlineDmaapTopicSink(servers, topic, 
 
 258                                                                                      partitionKey, useHttps, allowSelfSignedCerts);
 
 261                                 dmaapTopicWriters.put(topic, dmaapTopicSink);
 
 262                         return dmaapTopicSink;
 
 271         public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
 
 272                 return this.build(servers, topic, null, null, null, null, null, true, false, false);
 
 280         public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
 
 282                 String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
 
 283                 if (writeTopics == null || writeTopics.isEmpty()) {
 
 284                         logger.info("{}: no topic for DMaaP Sink", this);
 
 285                         return new ArrayList<DmaapTopicSink>();
 
 288                 List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
 
 289                 List<DmaapTopicSink> newDmaapTopicSinks = new ArrayList<DmaapTopicSink>();
 
 291                         for (String topic: writeTopicList) {
 
 292                                 if (this.dmaapTopicWriters.containsKey(topic)) {
 
 293                                         newDmaapTopicSinks.add(this.dmaapTopicWriters.get(topic));
 
 296                                 String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
 
 298                                                                         PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
 
 300                                 List<String> serverList;
 
 301                                 if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
 
 302                                 else serverList = new ArrayList<>();
 
 304                                 String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
 
 306                                                                                PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);          
 
 307                                 String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
 
 309                                                           PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
 
 311                                 String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
 
 313                                                           PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
 
 314                                 String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
 
 316                                                                             PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
 
 318                                 String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
 
 320                                                              PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
 
 322                                 String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
 
 323                                                                                       PolicyProperties.PROPERTY_MANAGED_SUFFIX);
 
 325                                 /* DME2 Properties */
 
 327                                 String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 328                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
 
 330                                 String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 331                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
 
 333                                 String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
 
 334                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX);
 
 336                                 String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
 
 337                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX);
 
 339                                 String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
 
 340                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
 
 342                                 String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 343                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
 
 345                                 String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 346                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX);
 
 348                                 String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 349                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX);
 
 351                                 String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS
 
 352                                                 + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX);
 
 354                                 String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
 
 355                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX);
 
 357                                 String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "."
 
 358                                                 + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX);
 
 360                                 String dme2SessionStickinessRequired = properties
 
 361                                                 .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic
 
 362                                                                 + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX);
 
 364                                 Map<String,String> dme2AdditionalProps = new HashMap<>();
 
 366                                 if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty())
 
 367                                         dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs);
 
 368                                 if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty())
 
 369                                         dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout);
 
 370                                 if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty())
 
 371                                         dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs);
 
 372                                 if (dme2Version != null && !dme2Version.isEmpty())
 
 373                                         dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version);
 
 374                                 if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty())
 
 375                                         dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
 
 376                                 if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty())
 
 377                                         dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath);
 
 378                                 if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty())
 
 379                                         dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired);
 
 381                                 if (servers == null || servers.isEmpty()) {
 
 382                                         logger.error("{}: no DMaaP servers or DME2 ServiceName provided", this);
 
 386                                 boolean managed = true;
 
 387                                 if (managedString != null && !managedString.isEmpty()) {
 
 388                                         managed = Boolean.parseBoolean(managedString);
 
 391                                 String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
 
 392                                                 PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
 
 394                                 //default is to use HTTP if no https property exists
 
 395                                 boolean useHttps = false;
 
 396                                 if (useHttpsString != null && !useHttpsString.isEmpty()){
 
 397                                         useHttps = Boolean.parseBoolean(useHttpsString);
 
 401                                 String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic +
 
 402                                                 PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
 
 404                                         //default is to disallow self-signed certs 
 
 405                                 boolean allowSelfSignedCerts = false;
 
 406                                 if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){
 
 407                                         allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString);
 
 410                                 DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
 
 412                                                                                                            aafMechId, aafPassword,
 
 414                                                                                                            dme2Environment, dme2AftEnvironment,
 
 415                                                                                                            dme2Partner, dme2Latitude, dme2Longitude,
 
 416                                                                                                            dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts);
 
 418                                 newDmaapTopicSinks.add(dmaapTopicSink);
 
 420                         return newDmaapTopicSinks;
 
 428         public void destroy(String topic) 
 
 429                    throws IllegalArgumentException {
 
 431                 if (topic == null || topic.isEmpty()) {
 
 432                         throw new IllegalArgumentException("A topic must be provided");
 
 435                 DmaapTopicSink dmaapTopicWriter;
 
 437                         if (!dmaapTopicWriters.containsKey(topic)) {
 
 441                         dmaapTopicWriter = dmaapTopicWriters.remove(topic);
 
 444                 dmaapTopicWriter.shutdown();
 
 451         public void destroy() {
 
 452                 List<DmaapTopicSink> writers = this.inventory();
 
 453                 for (DmaapTopicSink writer: writers) {
 
 458                         this.dmaapTopicWriters.clear();
 
 466         public DmaapTopicSink get(String topic) 
 
 467                         throws IllegalArgumentException, IllegalStateException {
 
 469                 if (topic == null || topic.isEmpty()) {
 
 470                         throw new IllegalArgumentException("A topic must be provided");
 
 474                         if (dmaapTopicWriters.containsKey(topic)) {
 
 475                                 return dmaapTopicWriters.get(topic);
 
 477                                 throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
 
 486         public synchronized List<DmaapTopicSink> inventory() {
 
 487                  List<DmaapTopicSink> writers = 
 
 488                                  new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
 
 493         public String toString() {
 
 494                 StringBuilder builder = new StringBuilder();
 
 495                 builder.append("IndexedDmaapTopicSinkFactory []");
 
 496                 return builder.toString();