2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus.internal;
 
  23 import java.util.List;
 
  26 import org.slf4j.LoggerFactory;
 
  27 import org.slf4j.Logger;
 
  28 import org.openecomp.policy.drools.event.comm.Topic;
 
  29 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
 
  32  * This implementation publishes events for the associated DMAAP topic,
 
  33  * inline with the calling thread.
 
  35 public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
 
  37         protected static Logger logger = 
 
  38                         LoggerFactory.getLogger(InlineDmaapTopicSink.class);
 
  40         protected final String userName;
 
  41         protected final String password;
 
  43         protected String environment = null;
 
  44         protected String aftEnvironment = null;
 
  45         protected String partner = null;
 
  46         protected String latitude = null;
 
  47         protected String longitude = null;
 
  49         protected Map<String,String> additionalProps = null;
 
  53          * @param servers DMaaP servers
 
  54          * @param topic DMaaP Topic to be monitored
 
  55          * @param apiKey DMaaP API Key (optional)
 
  56          * @param apiSecret DMaaP API Secret (optional)
 
  57          * @param consumerGroup DMaaP Reader Consumer Group
 
  58          * @param consumerInstance DMaaP Reader Instance
 
  59          * @param fetchTimeout DMaaP fetch timeout
 
  60          * @param fetchLimit DMaaP fetch limit
 
  61          * @param environment DME2 Environment
 
  62          * @param aftEnvironment DME2 AFT Environment
 
  63          * @param partner DME2 Partner
 
  64          * @param latitude DME2 Latitude
 
  65          * @param longitude DME2 Longitude
 
  66          * @param additionalProps Additional properties to pass to DME2
 
  67          * @param useHttps does connection use HTTPS?
 
  68          * @param allowSelfSignedCerts are self-signed certificates allow
 
  70          * @throws IllegalArgumentException An invalid parameter passed in
 
  72         public InlineDmaapTopicSink(List<String> servers, String topic, 
 
  73                                                                                         String apiKey, String apiSecret,
 
  74                                                                                         String userName, String password,
 
  76                                                                                         String environment, String aftEnvironment, String partner,
 
  77                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
 
  78                                                                                         boolean useHttps, boolean allowSelfSignedCerts)
 
  79                         throws IllegalArgumentException {
 
  81                 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
 
  83                 this.userName = userName;
 
  84                 this.password = password;
 
  86                 this.environment = environment;
 
  87                 this.aftEnvironment = aftEnvironment;
 
  88                 this.partner = partner;
 
  90                 this.latitude = latitude;
 
  91                 this.longitude = longitude;
 
  93                 this.additionalProps = additionalProps;
 
  96         public InlineDmaapTopicSink(List<String> servers, String topic, 
 
  97                                             String apiKey, String apiSecret,
 
  98                                     String userName, String password,
 
  99                                             String partitionKey, boolean useHttps,  boolean allowSelfSignedCerts) 
 
 100                 throws IllegalArgumentException {
 
 102                 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
 
 104                 this.userName = userName;
 
 105                 this.password = password;
 
 111                 if ((this.environment == null   || this.environment.isEmpty()) &&
 
 112                    (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
 
 113                    (this.latitude == null               || this.latitude.isEmpty()) &&
 
 114                    (this.longitude == null              || this.longitude.isEmpty()) &&
 
 115                    (this.partner == null                || this.partner.isEmpty())) {
 
 117                                 new BusPublisher.DmaapAafPublisherWrapper(this.servers, 
 
 120                                                                                this.password, this.useHttps);
 
 123                                 new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, 
 
 124                                                                                this.userName, this.password,
 
 125                                                                                this.environment, this.aftEnvironment,
 
 126                                                                                this.partner, this.latitude, this.longitude,
 
 127                                                                                this.additionalProps, this.useHttps);
 
 130                 logger.info("{}: DMAAP SINK created", this);
 
 137         public CommInfrastructure getTopicCommInfrastructure() {
 
 138                 return Topic.CommInfrastructure.DMAAP;
 
 143         public String toString() {
 
 144                 StringBuilder builder = new StringBuilder();
 
 145                 builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password)
 
 146                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
 
 147                                 .append(super.toString()).append("]");
 
 148                 return builder.toString();