2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
 
   7  * ================================================================================
 
   8  * Licensed under the Apache License, Version 2.0 (the "License");
 
   9  * you may not use this file except in compliance with the License.
 
  10  * You may obtain a copy of the License at
 
  12  *      http://www.apache.org/licenses/LICENSE-2.0
 
  14  * Unless required by applicable law or agreed to in writing, software
 
  15  * distributed under the License is distributed on an "AS IS" BASIS,
 
  16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  17  * See the License for the specific language governing permissions and
 
  18  * limitations under the License.
 
  19  * ============LICENSE_END=========================================================
 
  22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
 
  24 import java.util.ArrayList;
 
  25 import java.util.List;
 
  26 import org.apache.commons.collections4.queue.CircularFifoQueue;
 
  27 import org.onap.policy.common.endpoints.event.comm.Topic;
 
  28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
 
  29 import org.slf4j.Logger;
 
  30 import org.slf4j.LoggerFactory;
 
  32 public abstract class TopicBase implements Topic {
 
  37     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
 
  42     protected List<String> servers;
 
  47     protected final String topic;
 
  52     protected final String effectiveTopic;
 
  57     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
 
  60      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
 
  63     protected volatile boolean alive = false;
 
  66      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
 
  67      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
 
  68      * obvious since locked => !alive).
 
  70     protected volatile boolean locked = false;
 
  73      * All my subscribers for new message notifications.
 
  75     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
 
  78      * Instantiates a new Topic Base.
 
  80      * @param servers list of servers
 
  81      * @param topic topic name
 
  83      * @throws IllegalArgumentException if invalid parameters are present
 
  85     protected TopicBase(List<String> servers, String topic) {
 
  86         this(servers, topic, topic);
 
  90      * Instantiates a new Topic Base.
 
  92      * @param servers list of servers
 
  93      * @param topic topic name
 
  95      * @throws IllegalArgumentException if invalid parameters are present
 
  97     protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
 
  99         if (servers == null || servers.isEmpty()) {
 
 100             throw new IllegalArgumentException("Server(s) must be provided");
 
 103         if (topic == null || topic.isEmpty()) {
 
 104             throw new IllegalArgumentException("A Topic must be provided");
 
 107         String effectiveTopicCopy;
 
 108         if (effectiveTopic == null || effectiveTopic.isEmpty()) {
 
 109             effectiveTopicCopy = topic;
 
 111             effectiveTopicCopy = effectiveTopic;
 
 114         this.servers = servers;
 
 116         this.effectiveTopic = effectiveTopicCopy;
 
 120     public void register(TopicListener topicListener) {
 
 122         logger.info("{}: registering {}", this, topicListener);
 
 124         synchronized (this) {
 
 125             if (topicListener == null) {
 
 126                 throw new IllegalArgumentException("TopicListener must be provided");
 
 129             for (TopicListener listener : this.topicListeners) {
 
 130                 if (listener == topicListener) {
 
 135             this.topicListeners.add(topicListener);
 
 140     public void unregister(TopicListener topicListener) {
 
 142         logger.info("{}: unregistering {}", this, topicListener);
 
 144         synchronized (this) {
 
 145             if (topicListener == null) {
 
 146                 throw new IllegalArgumentException("TopicListener must be provided");
 
 149             this.topicListeners.remove(topicListener);
 
 154      * Broadcast event to all listeners.
 
 156      * @param message the event
 
 157      * @return true if all notifications are performed with no error, false otherwise
 
 159     protected boolean broadcast(String message) {
 
 160         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
 
 163         for (TopicListener topicListener : snapshotListeners) {
 
 165                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
 
 166             } catch (Exception e) {
 
 167                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
 
 175      * Take a snapshot of current topic listeners.
 
 177      * @return the topic listeners
 
 179     protected synchronized List<TopicListener> snapshotTopicListeners() {
 
 180         @SuppressWarnings("unchecked")
 
 181         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
 
 186     public boolean lock() {
 
 188         logger.info("{}: locking", this);
 
 190         synchronized (this) {
 
 202     public boolean unlock() {
 
 203         logger.info("{}: unlocking", this);
 
 205         synchronized (this) {
 
 215         } catch (Exception e) {
 
 216             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
 
 222     public boolean isLocked() {
 
 227     public String getTopic() {
 
 232     public String getEffectiveTopic() {
 
 233         return effectiveTopic;
 
 237     public boolean isAlive() {
 
 242     public List<String> getServers() {
 
 247     public synchronized String[] getRecentEvents() {
 
 248         var events = new String[recentEvents.size()];
 
 249         return recentEvents.toArray(events);
 
 254     public String toString() {
 
 255         return "TopicBase [servers=" + servers
 
 257             + ", effectiveTopic=" + effectiveTopic
 
 258             + ", #recentEvents=" + recentEvents.size()
 
 259             + ", locked=" + locked
 
 260             + ", #topicListeners=" + topicListeners.size()