2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.util.ArrayList;
24 import java.util.List;
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public abstract class TopicBase implements Topic {
37 private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
42 protected List<String> servers;
47 protected String topic;
52 protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
55 * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
58 protected volatile boolean alive = false;
61 * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
62 * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
63 * obvious since locked => !alive).
65 protected volatile boolean locked = false;
68 * All my subscribers for new message notifications.
70 protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
73 * Instantiates a new Topic Base.
75 * @param servers list of servers
76 * @param topic topic name
78 * @throws IllegalArgumentException if invalid parameters are present
80 public TopicBase(List<String> servers, String topic) {
82 if (servers == null || servers.isEmpty()) {
83 throw new IllegalArgumentException("Server(s) must be provided");
86 if (topic == null || topic.isEmpty()) {
87 throw new IllegalArgumentException("A Topic must be provided");
90 this.servers = servers;
95 public void register(TopicListener topicListener) {
97 logger.info("{}: registering {}", this, topicListener);
100 if (topicListener == null) {
101 throw new IllegalArgumentException("TopicListener must be provided");
104 for (TopicListener listener : this.topicListeners) {
105 if (listener == topicListener) {
110 this.topicListeners.add(topicListener);
115 public void unregister(TopicListener topicListener) {
117 logger.info("{}: unregistering {}", this, topicListener);
119 synchronized (this) {
120 if (topicListener == null) {
121 throw new IllegalArgumentException("TopicListener must be provided");
124 this.topicListeners.remove(topicListener);
129 * Broadcast event to all listeners.
131 * @param message the event
132 * @return true if all notifications are performed with no error, false otherwise
134 protected boolean broadcast(String message) {
135 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
137 boolean success = true;
138 for (TopicListener topicListener : snapshotListeners) {
140 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
141 } catch (Exception e) {
142 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
150 * Take a snapshot of current topic listeners.
152 * @return the topic listeners
154 protected synchronized List<TopicListener> snapshotTopicListeners() {
155 @SuppressWarnings("unchecked")
156 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
161 public boolean lock() {
163 logger.info("{}: locking", this);
165 synchronized (this) {
177 public boolean unlock() {
178 logger.info("{}: unlocking", this);
180 synchronized (this) {
190 } catch (Exception e) {
191 logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
197 public boolean isLocked() {
202 public String getTopic() {
207 public boolean isAlive() {
212 public List<String> getServers() {
217 public synchronized String[] getRecentEvents() {
218 String[] events = new String[recentEvents.size()];
219 return recentEvents.toArray(events);
224 public String toString() {
225 return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size()
226 + ", locked=" + locked + ", #topicListeners=" + topicListeners.size() + "]";