2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.util.ArrayList;
24 import java.util.List;
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public abstract class TopicBase implements Topic {
37 private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
42 protected List<String> servers;
47 protected String topic;
52 protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
55 * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
58 protected volatile boolean alive = false;
61 * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
62 * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
63 * obvious since locked => !alive)
65 protected volatile boolean locked = false;
68 * All my subscribers for new message notifications
70 protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
73 * Instantiates a new Topic Base
75 * @param servers list of servers
76 * @param topic topic name
78 * @return a Topic Base
79 * @throws IllegalArgumentException if invalid parameters are present
81 public TopicBase(List<String> servers, String topic) {
83 if (servers == null || servers.isEmpty()) {
84 throw new IllegalArgumentException("Server(s) must be provided");
87 if (topic == null || topic.isEmpty()) {
88 throw new IllegalArgumentException("A Topic must be provided");
91 this.servers = servers;
96 public void register(TopicListener topicListener) {
98 logger.info("{}: registering {}", this, topicListener);
100 synchronized (this) {
101 if (topicListener == null) {
102 throw new IllegalArgumentException("TopicListener must be provided");
105 for (TopicListener listener : this.topicListeners) {
106 if (listener == topicListener) {
111 this.topicListeners.add(topicListener);
116 public void unregister(TopicListener topicListener) {
118 logger.info("{}: unregistering {}", this, topicListener);
120 synchronized (this) {
121 if (topicListener == null) {
122 throw new IllegalArgumentException("TopicListener must be provided");
125 this.topicListeners.remove(topicListener);
130 * broadcast event to all listeners
132 * @param message the event
133 * @return true if all notifications are performed with no error, false otherwise
135 protected boolean broadcast(String message) {
136 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
138 boolean success = true;
139 for (TopicListener topicListener : snapshotListeners) {
141 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
142 } catch (Exception e) {
143 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
151 * take a snapshot of current topic listeners
153 * @return the topic listeners
155 protected synchronized List<TopicListener> snapshotTopicListeners() {
156 @SuppressWarnings("unchecked")
157 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
162 public boolean lock() {
164 logger.info("{}: locking", this);
166 synchronized (this) {
178 public boolean unlock() {
179 logger.info("{}: unlocking", this);
181 synchronized (this) {
191 } catch (Exception e) {
192 logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
198 public boolean isLocked() {
203 public String getTopic() {
208 public boolean isAlive() {
213 public List<String> getServers() {
218 public synchronized String[] getRecentEvents() {
219 String[] events = new String[recentEvents.size()];
220 return recentEvents.toArray(events);
225 public String toString() {
226 return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size()
227 + ", locked=" + locked + ", #topicListeners=" + topicListeners.size() + "]";