2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus.internal;
 
  23 import java.util.List;
 
  24 import java.util.UUID;
 
  26 import org.openecomp.policy.drools.event.comm.bus.BusTopicSink;
 
  27 import org.slf4j.LoggerFactory;
 
  28 import org.slf4j.Logger;
 
  31  * Transport Agnostic Bus Topic Sink to carry out the core functionality
 
  32  * to interact with a sink regardless if it is UEB or DMaaP.
 
  35 public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
 
  40         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
 
  41         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
 
  44          * The partition key to publish to
 
  46         protected String partitionId;
 
  49          * message bus publisher
 
  51         protected BusPublisher publisher;
 
  54          * constructor for abstract sink
 
  56          * @param servers servers
 
  58          * @param apiKey api secret
 
  59          * @param apiSecret api secret
 
  60          * @param partitionId partition id
 
  61          * @param useHttps does connection use HTTPS?
 
  62          * @param allowSelfSignedCerts are self-signed certificates allow
 
  63          * @throws IllegalArgumentException in invalid parameters are passed in
 
  65         public InlineBusTopicSink(List<String> servers, String topic, 
 
  66                                           String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts)
 
  67                         throws IllegalArgumentException {
 
  69                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);               
 
  71                 if (partitionId == null || partitionId.isEmpty()) {
 
  72                         this.partitionId = UUID.randomUUID ().toString();
 
  77          * Initialize the Bus publisher
 
  79         public abstract void init();
 
  85         public boolean start() throws IllegalStateException {           
 
  86                 logger.info("{}: starting", this);
 
  94                                 throw new IllegalStateException(this + " is locked.");
 
 107         public boolean stop() {
 
 109                 BusPublisher publisherCopy;
 
 112                         publisherCopy = this.publisher;
 
 113                         this.publisher = null;
 
 116                 if (publisherCopy != null) {
 
 118                                 publisherCopy.close();
 
 119                         } catch (Exception e) {
 
 120                                 logger.warn("{}: cannot stop publisher because of {}", 
 
 121                                                     this, e.getMessage(), e);
 
 124                         logger.warn("{}: there is no publisher", this);
 
 135         public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
 
 137                 if (message == null || message.isEmpty()) {
 
 138                         throw new IllegalArgumentException("Message to send is empty");
 
 142                         throw new IllegalStateException(this + " is stopped");
 
 146                         synchronized (this) {
 
 147                                 this.recentEvents.add(message);
 
 150                         netLogger.info("[OUT|{}|{}]{}{}", this.getTopicCommInfrastructure(), 
 
 151                                        this.topic, System.lineSeparator(), message);
 
 153                         publisher.send(this.partitionId, message);                      
 
 155                 } catch (Exception e) {
 
 156                         logger.warn("{}: cannot send because of {}", this, e.getMessage(), e);
 
 168         public void setPartitionKey(String partitionKey) {
 
 169                 this.partitionId = partitionKey;
 
 176         public String getPartitionKey() {
 
 177                 return this.partitionId;
 
 184         public void shutdown() throws IllegalStateException {
 
 190         public String toString() {
 
 191                 StringBuilder builder = new StringBuilder();
 
 192                 builder.append("InlineBusTopicSink [partitionId=").append(partitionId).append(", alive=").append(alive)
 
 193                                 .append(", publisher=").append(publisher).append("]");
 
 194                 return builder.toString();