2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24 import java.util.ArrayList;
25 import java.util.List;
26 import lombok.AccessLevel;
28 import org.apache.commons.collections4.queue.CircularFifoQueue;
29 import org.onap.policy.common.endpoints.event.comm.Topic;
30 import org.onap.policy.common.endpoints.event.comm.TopicListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 public abstract class TopicBase implements Topic {
40 private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
45 protected List<String> servers;
50 protected final String topic;
55 protected final String effectiveTopic;
60 protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
63 * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
66 protected volatile boolean alive = false;
69 * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
70 * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
71 * obvious since locked => !alive).
73 protected volatile boolean locked = false;
76 * All my subscribers for new message notifications.
78 @Getter(AccessLevel.NONE)
79 protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
82 * Instantiates a new Topic Base.
84 * @param servers list of servers
85 * @param topic topic name
87 * @throws IllegalArgumentException if invalid parameters are present
89 protected TopicBase(List<String> servers, String topic) {
90 this(servers, topic, topic);
94 * Instantiates a new Topic Base.
96 * @param servers list of servers
97 * @param topic topic name
99 * @throws IllegalArgumentException if invalid parameters are present
101 protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
103 if (servers == null || servers.isEmpty()) {
104 throw new IllegalArgumentException("Server(s) must be provided");
107 if (topic == null || topic.isEmpty()) {
108 throw new IllegalArgumentException("A Topic must be provided");
111 String effectiveTopicCopy;
112 if (effectiveTopic == null || effectiveTopic.isEmpty()) {
113 effectiveTopicCopy = topic;
115 effectiveTopicCopy = effectiveTopic;
118 this.servers = servers;
120 this.effectiveTopic = effectiveTopicCopy;
124 public void register(TopicListener topicListener) {
126 logger.info("{}: registering {}", this, topicListener);
128 synchronized (this) {
129 if (topicListener == null) {
130 throw new IllegalArgumentException("TopicListener must be provided");
133 for (TopicListener listener : this.topicListeners) {
134 if (listener == topicListener) {
139 this.topicListeners.add(topicListener);
144 public void unregister(TopicListener topicListener) {
146 logger.info("{}: unregistering {}", this, topicListener);
148 synchronized (this) {
149 if (topicListener == null) {
150 throw new IllegalArgumentException("TopicListener must be provided");
153 this.topicListeners.remove(topicListener);
158 * Broadcast event to all listeners.
160 * @param message the event
161 * @return true if all notifications are performed with no error, false otherwise
163 protected boolean broadcast(String message) {
164 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
167 for (TopicListener topicListener : snapshotListeners) {
169 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
170 } catch (Exception e) {
171 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
179 * Take a snapshot of current topic listeners.
181 * @return the topic listeners
183 protected synchronized List<TopicListener> snapshotTopicListeners() {
184 @SuppressWarnings("unchecked")
185 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
190 public boolean lock() {
192 logger.info("{}: locking", this);
194 synchronized (this) {
206 public boolean unlock() {
207 logger.info("{}: unlocking", this);
209 synchronized (this) {
219 } catch (Exception e) {
220 logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
226 public synchronized String[] getRecentEvents() {
227 var events = new String[recentEvents.size()];
228 return recentEvents.toArray(events);
233 public String toString() {
234 return "TopicBase [servers=" + servers
236 + ", effectiveTopic=" + effectiveTopic
237 + ", #recentEvents=" + recentEvents.size()
238 + ", locked=" + locked
239 + ", #topicListeners=" + topicListeners.size()