2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.openecomp.policy.drools.event.comm.bus.internal;
 
  23 import java.util.ArrayList;
 
  24 import java.util.List;
 
  25 import java.util.UUID;
 
  27 import org.slf4j.Logger;
 
  28 import org.slf4j.LoggerFactory;
 
  30 import org.openecomp.policy.drools.event.comm.TopicListener;
 
  31 import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
 
  34  * This topic source implementation specializes in reading messages
 
  35  * over a bus topic source and notifying its listeners
 
  37 public abstract class SingleThreadedBusTopicSource 
 
  39        implements Runnable, BusTopicSource {
 
  42          * Not to be converted to PolicyLogger.
 
  43          * This will contain all instract /out traffic and only that in a single file in a concise format.
 
  45         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
 
  46         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
 
  51         protected final String consumerGroup;
 
  54          * Bus consumer instance
 
  56         protected final String consumerInstance;
 
  61         protected final int fetchTimeout;
 
  66         protected final int fetchLimit;
 
  69          * Message Bus Consumer
 
  71         protected BusConsumer consumer;
 
  75          * reflects invocation of start()/stop() 
 
  76          * !locked & start() => alive
 
  79         protected volatile boolean alive = false;
 
  82          * Independent thread reading message over my topic
 
  84         protected Thread busPollerThread;
 
  87          * All my subscribers for new message notifications
 
  89         protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
 
  94          * @param servers Bus servers
 
  95          * @param topic Bus Topic to be monitored
 
  96          * @param apiKey Bus API Key (optional)
 
  97          * @param apiSecret Bus API Secret (optional)
 
  98          * @param consumerGroup Bus Reader Consumer Group
 
  99          * @param consumerInstance Bus Reader Instance
 
 100          * @param fetchTimeout Bus fetch timeout
 
 101          * @param fetchLimit Bus fetch limit
 
 102          * @param useHttps does the bus use https
 
 103          * @param allowSelfSignedCerts are self-signed certificates allowed
 
 104          * @throws IllegalArgumentException An invalid parameter passed in
 
 106         public SingleThreadedBusTopicSource(List<String> servers, 
 
 110                                                                 String consumerGroup, 
 
 111                                                                 String consumerInstance,
 
 115                                                                 boolean allowSelfSignedCerts) 
 
 116         throws IllegalArgumentException {
 
 118                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
 
 120                 if (consumerGroup == null || consumerGroup.isEmpty()) {
 
 121                         this.consumerGroup = UUID.randomUUID ().toString();
 
 123                         this.consumerGroup = consumerGroup;
 
 126                 if (consumerInstance == null || consumerInstance.isEmpty()) {
 
 127                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
 
 129                         this.consumerInstance = consumerInstance;
 
 132                 if (fetchTimeout <= 0) {
 
 133                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
 
 135                         this.fetchTimeout = fetchTimeout;
 
 138                 if (fetchLimit <= 0) {
 
 139                         this.fetchLimit = NO_LIMIT_FETCH;
 
 141                         this.fetchLimit = fetchLimit;
 
 147          * Initialize the Bus client
 
 149         public abstract void init() throws Exception;
 
 152         public void register(TopicListener topicListener) 
 
 153                 throws IllegalArgumentException {               
 
 155                 super.register(topicListener);
 
 158                         if (!alive && !locked)
 
 161                                 logger.info("{}: register: start not attempted", this);
 
 162                 } catch (Exception e) {
 
 163                         logger.warn("{}: cannot start after registration of because of: {}",
 
 164                                 this, topicListener, e.getMessage(), e);
 
 169         public void unregister(TopicListener topicListener) {
 
 170                 boolean stop = false;
 
 171                 synchronized (this) {
 
 172                         super.unregister(topicListener);
 
 173                         stop = (this.topicListeners.isEmpty());
 
 182         public boolean start() throws IllegalStateException {           
 
 183                 logger.info("{}: starting", this);
 
 191                                 throw new IllegalStateException(this + " is locked.");
 
 193                         if (this.busPollerThread == null || 
 
 194                                 !this.busPollerThread.isAlive() || 
 
 195                                 this.consumer == null) {
 
 200                                         this.busPollerThread = new Thread(this);
 
 201                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
 
 202                                         busPollerThread.start();
 
 203                                 } catch (Exception e) {
 
 204                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
 
 205                                         throw new IllegalStateException(e);
 
 214         public boolean stop() {
 
 215                 logger.info("{}: stopping", this);
 
 218                         BusConsumer consumerCopy = this.consumer;
 
 221                         this.consumer = null;
 
 223                         if (consumerCopy != null) {
 
 225                                         consumerCopy.close();
 
 226                                 } catch (Exception e) {
 
 227                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
 
 238          * Run thread method for the Bus Reader
 
 244                                 for (String event: this.consumer.fetch()) {                                     
 
 245                                         synchronized (this) {
 
 246                                                 this.recentEvents.add(event);
 
 249                                         netLogger.info("[IN|{}|{}]{}{}",
 
 250                                                            this.getTopicCommInfrastructure(), this.topic, 
 
 251                                                            System.lineSeparator(), event);
 
 258                         } catch (Exception e) {
 
 259                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
 
 263                 logger.info("{}: exiting thread", this);
 
 270         public boolean offer(String event) {
 
 272                         throw new IllegalStateException(this + " is not alive.");
 
 275                 synchronized (this) {
 
 276                         this.recentEvents.add(event);
 
 279                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
 
 280                                        System.lineSeparator(), event);
 
 283                 return broadcast(event);
 
 288         public String toString() {
 
 289                 StringBuilder builder = new StringBuilder();
 
 290                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
 
 291                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
 
 292                                 .append(", fetchLimit=").append(fetchLimit)
 
 293                                 .append(", consumer=").append(this.consumer).append(", alive=")
 
 294                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
 
 295                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
 
 297                 return builder.toString();
 
 304         public boolean isAlive() {
 
 312         public String getConsumerGroup() {
 
 313                 return consumerGroup;
 
 320         public String getConsumerInstance() {
 
 321                 return consumerInstance;
 
 328         public void shutdown() throws IllegalStateException {
 
 330                 this.topicListeners.clear();
 
 337         public int getFetchTimeout() {
 
 345         public int getFetchLimit() {