2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
7 * Modifications Copyright (C) 2023 Nordix Foundation.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import java.util.ArrayList;
26 import java.util.List;
27 import lombok.AccessLevel;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 public abstract class TopicBase implements Topic {
41 private static final Logger logger = LoggerFactory.getLogger(TopicBase.class);
46 protected List<String> servers;
51 protected final String topic;
56 protected final String effectiveTopic;
61 protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
64 * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
67 protected volatile boolean alive = false;
70 * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
71 * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
72 * obvious since locked => !alive).
74 protected volatile boolean locked = false;
77 * All my subscribers for new message notifications.
79 @Getter(AccessLevel.NONE)
80 protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
83 * Instantiates a new Topic Base.
85 * @param servers list of servers
86 * @param topic topic name
88 * @throws IllegalArgumentException if invalid parameters are present
90 protected TopicBase(List<String> servers, String topic) {
91 this(servers, topic, topic);
95 * Instantiates a new Topic Base.
97 * @param servers list of servers
98 * @param topic topic name
100 * @throws IllegalArgumentException if invalid parameters are present
102 protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
104 if (servers == null || servers.isEmpty()) {
105 throw new IllegalArgumentException("Server(s) must be provided");
108 if (topic == null || topic.isEmpty()) {
109 throw new IllegalArgumentException("A Topic must be provided");
112 String effectiveTopicCopy;
113 if (effectiveTopic == null || effectiveTopic.isEmpty()) {
114 effectiveTopicCopy = topic;
116 effectiveTopicCopy = effectiveTopic;
119 this.servers = servers;
121 this.effectiveTopic = effectiveTopicCopy;
125 public void register(TopicListener topicListener) {
127 logger.info("{}: registering {}", this, topicListener);
129 synchronized (this) {
130 if (topicListener == null) {
131 throw new IllegalArgumentException("TopicListener must be provided");
134 for (TopicListener listener : this.topicListeners) {
135 if (listener == topicListener) {
140 this.topicListeners.add(topicListener);
145 public void unregister(TopicListener topicListener) {
147 logger.info("{}: unregistering {}", this, topicListener);
149 synchronized (this) {
150 if (topicListener == null) {
151 throw new IllegalArgumentException("TopicListener must be provided");
154 this.topicListeners.remove(topicListener);
159 * Broadcast event to all listeners.
161 * @param message the event
162 * @return true if all notifications are performed with no error, false otherwise
164 protected boolean broadcast(String message) {
165 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
168 for (TopicListener topicListener : snapshotListeners) {
170 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
171 } catch (Exception e) {
172 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
180 * Take a snapshot of current topic listeners.
182 * @return the topic listeners
184 protected synchronized List<TopicListener> snapshotTopicListeners() {
185 @SuppressWarnings("unchecked")
186 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
191 public boolean lock() {
193 logger.info("{}: locking", this);
195 synchronized (this) {
207 public boolean unlock() {
208 logger.info("{}: unlocking", this);
210 synchronized (this) {
220 } catch (Exception e) {
221 logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
227 public synchronized String[] getRecentEvents() {
228 var events = new String[recentEvents.size()];
229 return recentEvents.toArray(events);
234 public String toString() {
235 return "TopicBase [servers=" + servers
237 + ", effectiveTopic=" + effectiveTopic
238 + ", #recentEvents=" + recentEvents.size()
239 + ", locked=" + locked
240 + ", #topicListeners=" + topicListeners.size()