2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus.internal;
 
  23 import java.net.MalformedURLException;
 
  24 import java.security.GeneralSecurityException;
 
  25 import java.util.ArrayList;
 
  26 import java.util.List;
 
  28 import java.util.Properties;
 
  29 import java.util.concurrent.TimeUnit;
 
  31 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory;
 
  32 import org.openecomp.policy.drools.properties.PolicyProperties;
 
  33 import org.slf4j.Logger;
 
  34 import org.slf4j.LoggerFactory;
 
  36 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
 
  37 import com.att.nsa.cambria.client.CambriaClientBuilders;
 
  38 import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
 
  39 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
 
  40 import com.att.nsa.mr.client.response.MRPublisherResponse;
 
  41 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
  42 import com.fasterxml.jackson.annotation.JsonIgnore;
 
  44 public interface BusPublisher {
 
  50          * @param message the message
 
  51          * @return true if success, false otherwise
 
  52          * @throws IllegalArgumentException if no message provided
 
  54         public boolean send(String partitionId, String message) throws IllegalArgumentException;
 
  57          * closes the publisher
 
  62          * Cambria based library publisher
 
  64         public static class CambriaPublisherWrapper implements BusPublisher {
 
  66                 private static Logger logger = LoggerFactory.getLogger(CambriaPublisherWrapper.class);
 
  69                  * The actual Cambria publisher
 
  72                 protected volatile CambriaBatchingPublisher publisher;
 
  74                 public CambriaPublisherWrapper(List<String> servers, String topic,
 
  76                                                                String apiSecret, boolean useHttps) throws IllegalArgumentException {
 
  77                         PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
 
  82                                 builder.usingHosts(servers)
 
  87                                 builder.usingHosts(servers)
 
  92                         if (apiKey != null && !apiKey.isEmpty() &&
 
  93                                 apiSecret != null && !apiSecret.isEmpty()) {
 
  94                                 builder.authenticatedBy(apiKey, apiSecret);
 
  98                                 this.publisher = builder.build();
 
  99                         } catch (MalformedURLException | GeneralSecurityException e) {
 
 100                                 throw new IllegalArgumentException(e);
 
 108                 public boolean send(String partitionId, String message) 
 
 109                                 throws IllegalArgumentException {
 
 111                                 throw new IllegalArgumentException("No message provided");
 
 114                                 this.publisher.send(partitionId, message);
 
 115                         } catch (Exception e) {
 
 116                                 logger.warn("{}: SEND of {} cannot be performed because of {}",
 
 117                                                     this, message, e.getMessage(), e);
 
 127                 public void close() {
 
 128                         logger.info("{}: CLOSE", this);
 
 131                                 this.publisher.close();
 
 132                         } catch (Exception e) {
 
 133                                 logger.warn("{}: CLOSE FAILED because of {}", 
 
 134                                                     this, e.getMessage(),e);
 
 140                 public String toString() {
 
 141                         StringBuilder builder = new StringBuilder();
 
 142                         builder.append("CambriaPublisherWrapper [").
 
 143                         append("publisher.getPendingMessageCount()=").
 
 144                         append(publisher.getPendingMessageCount()).
 
 146                         return builder.toString();
 
 152          * DmaapClient library wrapper
 
 154         public abstract class DmaapPublisherWrapper implements BusPublisher {
 
 156                 private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
 
 161                 protected MRSimplerBatchPublisher publisher;
 
 162                 protected Properties props;
 
 165                  * MR Publisher Wrapper
 
 167                  * @param servers messaging bus hosts
 
 169                  * @param username AAF or DME2 Login
 
 170                  * @param password AAF or DME2 Password
 
 172                 public DmaapPublisherWrapper(ProtocolTypeConstants protocol,
 
 173                                                                          List<String> servers, String topic,
 
 175                                                      String password, boolean useHttps) throws IllegalArgumentException {
 
 178                         if (topic == null || topic.isEmpty())
 
 179                                 throw new IllegalArgumentException("No topic for DMaaP");
 
 182                         if (protocol == ProtocolTypeConstants.AAF_AUTH) {
 
 183                                 if (servers == null || servers.isEmpty())
 
 184                                         throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
 
 186                                 ArrayList<String> dmaapServers = new ArrayList<String>();
 
 188                                         for (String server: servers) {
 
 189                                                 dmaapServers.add(server + ":3905");
 
 194                                         for (String server: servers) {
 
 195                                                 dmaapServers.add(server + ":3904");
 
 201                                         new MRSimplerBatchPublisher.Builder().
 
 202                                                                                                 againstUrls(dmaapServers).
 
 206                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
 
 207                         } else if (protocol == ProtocolTypeConstants.DME2) {
 
 208                                 ArrayList<String> dmaapServers = new ArrayList<String>();
 
 209                                 dmaapServers.add("0.0.0.0:3904");
 
 212                                         new MRSimplerBatchPublisher.Builder().
 
 213                                                                                                 againstUrls(dmaapServers).
 
 217                                 this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
 
 220                         this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
 
 222                         this.publisher.setUsername(username);
 
 223                         this.publisher.setPassword(password);  
 
 225                         props = new Properties();
 
 228                                 props.setProperty("Protocol", "https");
 
 230                                 props.setProperty("Protocol", "http");
 
 233                         props.setProperty("contenttype", "application/json");
 
 234                         props.setProperty("username", username);
 
 235                         props.setProperty("password", password);
 
 237                         props.setProperty("topic", topic);
 
 239                         this.publisher.setProps(props);
 
 241                         if (protocol == ProtocolTypeConstants.AAF_AUTH)
 
 242                                 this.publisher.setHost(servers.get(0));
 
 244                         logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
 
 251                 public void close() {
 
 252                         logger.info("{}: CLOSE", this);
 
 255                                 this.publisher.close(1, TimeUnit.SECONDS);
 
 256                         } catch (Exception e) {
 
 257                                 logger.warn("{}: CLOSE FAILED because of {}",
 
 258                                                     this, e.getMessage(), e);
 
 266                 public boolean send(String partitionId, String message) 
 
 267                                 throws IllegalArgumentException {
 
 269                                 throw new IllegalArgumentException("No message provided");
 
 271                         this.publisher.setPubResponse(new MRPublisherResponse());
 
 272                         this.publisher.send(partitionId, message);
 
 273                         MRPublisherResponse response = this.publisher.sendBatchWithResponse();
 
 274                         if (response != null) {
 
 275                                 logger.debug("DMaaP publisher received {} : {}",
 
 276                                                      response.getResponseCode(), 
 
 277                                                      response.getResponseMessage());
 
 284                 public String toString() {
 
 285                         StringBuilder builder = new StringBuilder();
 
 286                         builder.append("DmaapPublisherWrapper [").
 
 287                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
 
 288                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
 
 289                         append(", publisher.getHost()=").append(publisher.getHost()).
 
 290                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
 
 291                         append(", publisher.getUsername()=").append(publisher.getUsername()).
 
 292                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
 
 294                         return builder.toString();
 
 299          * DmaapClient library wrapper
 
 301         public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper {
 
 305                 protected MRSimplerBatchPublisher publisher;
 
 307                 public DmaapAafPublisherWrapper(List<String> servers, String topic,
 
 309                                                      String aafPassword, boolean useHttps) {
 
 311                         super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps);
 
 315         public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper {
 
 316                 public DmaapDmePublisherWrapper(List<String> servers, String topic,
 
 317                                                                                 String username, String password,
 
 318                                                                                 String environment, String aftEnvironment, String dme2Partner,
 
 319                                                                                 String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) {
 
 321                         super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps);
 
 328                         String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
 
 330                          if (environment == null || environment.isEmpty()) {
 
 331                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
 
 332                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
 
 333                         } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
 
 334                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
 
 335                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
 
 336                         } if (latitude == null || latitude.isEmpty()) {
 
 337                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
 
 338                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
 
 339                         } if (longitude == null || longitude.isEmpty()) {
 
 340                                 throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS +
 
 341                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
 
 344                         if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
 
 345                                 throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
 
 346                                                 "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + 
 
 347                                                 PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
 
 350                         String serviceName = servers.get(0);
 
 352                         /* These are required, no defaults */
 
 353                         props.setProperty("Environment", environment);
 
 354                         props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
 
 356                         props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
 
 358                         if (dme2Partner != null)
 
 359                                 props.setProperty("Partner", dme2Partner);
 
 360                         if (dme2RouteOffer != null)
 
 361                                 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
 
 363                         props.setProperty("Latitude", latitude);
 
 364                         props.setProperty("Longitude", longitude);
 
 366                         // ServiceName also a default, found in additionalProps
 
 368                         /* These are optional, will default to these values if not set in optionalProps */
 
 369                         props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
 
 370                         props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
 
 371                         props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
 
 372                         props.setProperty("Version", "1.0");
 
 373                         props.setProperty("SubContextPath", "/");
 
 374                         props.setProperty("sessionstickinessrequired", "no");
 
 376                         /* These should not change */
 
 377                         props.setProperty("TransportType", "DME2");
 
 378                         props.setProperty("MethodType", "POST");
 
 380                         for (String key : additionalProps.keySet()) {
 
 381                                 String value = additionalProps.get(key);
 
 384                                         props.setProperty(key, value);
 
 387                         this.publisher.setProps(props);