2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus.internal;
 
  23 import java.util.List;
 
  26 import org.openecomp.policy.drools.event.comm.Topic;
 
  27 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
 
  28 import org.slf4j.Logger;
 
  29 import org.slf4j.LoggerFactory;
 
  32  * This topic reader implementation specializes in reading messages
 
  33  * over DMAAP topic and notifying its listeners
 
  35 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
 
  36                                             implements DmaapTopicSource, Runnable {     
 
  38         private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
 
  40         protected boolean allowSelfSignedCerts;
 
  41         protected final String userName;
 
  42         protected final String password;
 
  44         protected String environment = null;
 
  45         protected String aftEnvironment = null;
 
  46         protected String partner = null;
 
  47         protected String latitude = null;
 
  48         protected String longitude = null;
 
  50         protected Map<String,String> additionalProps = null;
 
  55          * @param servers DMaaP servers
 
  56          * @param topic DMaaP Topic to be monitored
 
  57          * @param apiKey DMaaP API Key (optional)
 
  58          * @param apiSecret DMaaP API Secret (optional)
 
  59          * @param consumerGroup DMaaP Reader Consumer Group
 
  60          * @param consumerInstance DMaaP Reader Instance
 
  61          * @param fetchTimeout DMaaP fetch timeout
 
  62          * @param fetchLimit DMaaP fetch limit
 
  63          * @param environment DME2 Environment
 
  64          * @param aftEnvironment DME2 AFT Environment
 
  65          * @param partner DME2 Partner
 
  66          * @param latitude DME2 Latitude
 
  67          * @param longitude DME2 Longitude
 
  68          * @param additionalProps Additional properties to pass to DME2
 
  69          * @param useHttps does connection use HTTPS?
 
  70          * @param allowSelfSignedCerts are self-signed certificates allow
 
  72          * @throws IllegalArgumentException An invalid parameter passed in
 
  74         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
 
  75                                                                                         String apiKey, String apiSecret,
 
  76                                                                                         String userName, String password,
 
  77                                                                                         String consumerGroup, String consumerInstance,
 
  78                                                                                         int fetchTimeout, int fetchLimit,
 
  79                                                                                         String environment, String aftEnvironment, String partner,
 
  80                                                                                         String latitude, String longitude, Map<String,String> additionalProps,
 
  81                                                                                         boolean useHttps, boolean allowSelfSignedCerts)
 
  82                         throws IllegalArgumentException {
 
  84                 super(servers, topic, apiKey, apiSecret, 
 
  85                           consumerGroup, consumerInstance, 
 
  86                           fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts);
 
  88                 this.userName = userName;
 
  89                 this.password = password;
 
  91                 this.environment = environment;
 
  92                 this.aftEnvironment = aftEnvironment;
 
  93                 this.partner = partner;
 
  95                 this.latitude = latitude;
 
  96                 this.longitude = longitude;
 
  98                 this.additionalProps = additionalProps;
 
 101                 } catch (Exception e) {
 
 102                         logger.error("ERROR during init of topic {}", this.topic);
 
 103                         throw new IllegalArgumentException(e);
 
 109          * @param servers DMaaP servers
 
 110          * @param topic DMaaP Topic to be monitored
 
 111          * @param apiKey DMaaP API Key (optional)
 
 112          * @param apiSecret DMaaP API Secret (optional)
 
 113          * @param consumerGroup DMaaP Reader Consumer Group
 
 114          * @param consumerInstance DMaaP Reader Instance
 
 115          * @param fetchTimeout DMaaP fetch timeout
 
 116          * @param fetchLimit DMaaP fetch limit
 
 117          * @param useHttps does connection use HTTPS?
 
 118          * @param allowSelfSignedCerts are self-signed certificates allow
 
 119          * @throws IllegalArgumentException An invalid parameter passed in
 
 121         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
 
 122                                                       String apiKey, String apiSecret,
 
 123                                                       String userName, String password,
 
 124                                                       String consumerGroup, String consumerInstance, 
 
 125                                                       int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts)
 
 126                         throws IllegalArgumentException {
 
 129                 super(servers, topic, apiKey, apiSecret, 
 
 130                           consumerGroup, consumerInstance, 
 
 131                           fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts);
 
 133                 this.userName = userName;
 
 134                 this.password = password;               
 
 138                 } catch (Exception e) {
 
 139                         logger.warn("dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
 
 140                         throw new IllegalArgumentException(e);
 
 146          * Initialize the Cambria or MR Client
 
 149         public void init() throws Exception {
 
 150                 if (this.userName == null || this.userName.isEmpty() || 
 
 151                                 this.password == null || this.password.isEmpty()) {
 
 153                                                 new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
 
 154                                                                                            this.apiKey, this.apiSecret,
 
 155                                                                                            this.consumerGroup, this.consumerInstance,
 
 156                                                                                            this.fetchTimeout, this.fetchLimit,
 
 157                                                                                            this.useHttps, this.allowSelfSignedCerts);
 
 158                 } else if ((this.environment == null    || this.environment.isEmpty()) &&
 
 159                                    (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) &&
 
 160                                    (this.latitude == null          || this.latitude.isEmpty()) &&
 
 161                                    (this.longitude == null         || this.longitude.isEmpty()) &&
 
 162                                    (this.partner == null           || this.partner.isEmpty())) {
 
 164                                         new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, 
 
 165                                                                                     this.apiKey, this.apiSecret,
 
 166                                                                                     this.userName, this.password,
 
 167                                                                                     this.consumerGroup, this.consumerInstance,
 
 168                                                                                     this.fetchTimeout, this.fetchLimit, this.useHttps);
 
 171                                         new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, 
 
 172                                                                                     this.apiKey, this.apiSecret,
 
 173                                                                                     this.userName, this.password,
 
 174                                                                                     this.consumerGroup, this.consumerInstance,
 
 175                                                                                     this.fetchTimeout, this.fetchLimit,
 
 176                                                                                     this.environment, this.aftEnvironment, this.partner,
 
 177                                                                                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
 
 180                 logger.info("{}: INITTED", this);
 
 187         public CommInfrastructure getTopicCommInfrastructure() {
 
 188                 return Topic.CommInfrastructure.DMAAP;
 
 192         public String toString() {
 
 193                 StringBuilder builder = new StringBuilder();
 
 194                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
 
 195                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
 
 196                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
 
 197                                 .append(", toString()=").append(super.toString()).append("]");
 
 198                 return builder.toString();