2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.util.ArrayList;
24 import java.util.List;
25 import org.apache.commons.collections4.queue.CircularFifoQueue;
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.TopicListener;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 public abstract class TopicBase implements Topic {
36 private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
41 protected List<String> servers;
46 protected final String topic;
51 protected final String effectiveTopic;
56 protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
59 * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
62 protected volatile boolean alive = false;
65 * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
66 * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
67 * obvious since locked => !alive).
69 protected volatile boolean locked = false;
72 * All my subscribers for new message notifications.
74 protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
77 * Instantiates a new Topic Base.
79 * @param servers list of servers
80 * @param topic topic name
82 * @throws IllegalArgumentException if invalid parameters are present
84 public TopicBase(List<String> servers, String topic) {
85 this(servers, topic, topic);
89 * Instantiates a new Topic Base.
91 * @param servers list of servers
92 * @param topic topic name
94 * @throws IllegalArgumentException if invalid parameters are present
96 public TopicBase(List<String> servers, String topic, String effectiveTopic) {
98 if (servers == null || servers.isEmpty()) {
99 throw new IllegalArgumentException("Server(s) must be provided");
102 if (topic == null || topic.isEmpty()) {
103 throw new IllegalArgumentException("A Topic must be provided");
106 String effectiveTopicCopy;
107 if (effectiveTopic == null || effectiveTopic.isEmpty()) {
108 effectiveTopicCopy = topic;
110 effectiveTopicCopy = effectiveTopic;
113 this.servers = servers;
115 this.effectiveTopic = effectiveTopicCopy;
119 public void register(TopicListener topicListener) {
121 logger.info("{}: registering {}", this, topicListener);
123 synchronized (this) {
124 if (topicListener == null) {
125 throw new IllegalArgumentException("TopicListener must be provided");
128 for (TopicListener listener : this.topicListeners) {
129 if (listener == topicListener) {
134 this.topicListeners.add(topicListener);
139 public void unregister(TopicListener topicListener) {
141 logger.info("{}: unregistering {}", this, topicListener);
143 synchronized (this) {
144 if (topicListener == null) {
145 throw new IllegalArgumentException("TopicListener must be provided");
148 this.topicListeners.remove(topicListener);
153 * Broadcast event to all listeners.
155 * @param message the event
156 * @return true if all notifications are performed with no error, false otherwise
158 protected boolean broadcast(String message) {
159 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
161 boolean success = true;
162 for (TopicListener topicListener : snapshotListeners) {
164 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
165 } catch (Exception e) {
166 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
174 * Take a snapshot of current topic listeners.
176 * @return the topic listeners
178 protected synchronized List<TopicListener> snapshotTopicListeners() {
179 @SuppressWarnings("unchecked")
180 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
185 public boolean lock() {
187 logger.info("{}: locking", this);
189 synchronized (this) {
201 public boolean unlock() {
202 logger.info("{}: unlocking", this);
204 synchronized (this) {
214 } catch (Exception e) {
215 logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
221 public boolean isLocked() {
226 public String getTopic() {
231 public String getEffectiveTopic() {
232 return effectiveTopic;
236 public boolean isAlive() {
241 public List<String> getServers() {
246 public synchronized String[] getRecentEvents() {
247 String[] events = new String[recentEvents.size()];
248 return recentEvents.toArray(events);
253 public String toString() {
254 return "TopicBase [servers=" + servers
256 + ", effectiveTopic=" + effectiveTopic
257 + ", #recentEvents=" + recentEvents.size()
258 + ", locked=" + locked
259 + ", #topicListeners=" + topicListeners.size()