dca1437bde5890b4cdb1b38025564f4e8a0f4cf4
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public abstract class TopicBase implements Topic {
33
34     /**
35      * Logger.
36      */
37     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
38
39     /**
40      * List of servers.
41      */
42     protected List<String> servers;
43
44     /**
45      * Topic.
46      */
47     protected final String topic;
48
49     /**
50      * Topic Alias.
51      */
52     protected final String effectiveTopic;
53
54     /**
55      * Event cache.
56      */
57     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
58
59     /**
60      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
61      * !alive.
62      */
63     protected volatile boolean alive = false;
64
65     /**
66      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
67      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
68      * obvious since locked => !alive).
69      */
70     protected volatile boolean locked = false;
71
72     /**
73      * All my subscribers for new message notifications.
74      */
75     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
76
77     /**
78      * Instantiates a new Topic Base.
79      *
80      * @param servers list of servers
81      * @param topic topic name
82      *
83      * @throws IllegalArgumentException if invalid parameters are present
84      */
85     protected TopicBase(List<String> servers, String topic) {
86         this(servers, topic, topic);
87     }
88
89     /**
90      * Instantiates a new Topic Base.
91      *
92      * @param servers list of servers
93      * @param topic topic name
94      *
95      * @throws IllegalArgumentException if invalid parameters are present
96      */
97     protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
98
99         if (servers == null || servers.isEmpty()) {
100             throw new IllegalArgumentException("Server(s) must be provided");
101         }
102
103         if (topic == null || topic.isEmpty()) {
104             throw new IllegalArgumentException("A Topic must be provided");
105         }
106
107         String effectiveTopicCopy;
108         if (effectiveTopic == null || effectiveTopic.isEmpty()) {
109             effectiveTopicCopy = topic;
110         } else {
111             effectiveTopicCopy = effectiveTopic;
112         }
113
114         this.servers = servers;
115         this.topic = topic;
116         this.effectiveTopic = effectiveTopicCopy;
117     }
118
119     @Override
120     public void register(TopicListener topicListener) {
121
122         logger.info("{}: registering {}", this, topicListener);
123
124         synchronized (this) {
125             if (topicListener == null) {
126                 throw new IllegalArgumentException("TopicListener must be provided");
127             }
128
129             for (TopicListener listener : this.topicListeners) {
130                 if (listener == topicListener) {
131                     return;
132                 }
133             }
134
135             this.topicListeners.add(topicListener);
136         }
137     }
138
139     @Override
140     public void unregister(TopicListener topicListener) {
141
142         logger.info("{}: unregistering {}", this, topicListener);
143
144         synchronized (this) {
145             if (topicListener == null) {
146                 throw new IllegalArgumentException("TopicListener must be provided");
147             }
148
149             this.topicListeners.remove(topicListener);
150         }
151     }
152
153     /**
154      * Broadcast event to all listeners.
155      *
156      * @param message the event
157      * @return true if all notifications are performed with no error, false otherwise
158      */
159     protected boolean broadcast(String message) {
160         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
161
162         var success = true;
163         for (TopicListener topicListener : snapshotListeners) {
164             try {
165                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
166             } catch (Exception e) {
167                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
168                 success = false;
169             }
170         }
171         return success;
172     }
173
174     /**
175      * Take a snapshot of current topic listeners.
176      *
177      * @return the topic listeners
178      */
179     protected synchronized List<TopicListener> snapshotTopicListeners() {
180         @SuppressWarnings("unchecked")
181         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
182         return listeners;
183     }
184
185     @Override
186     public boolean lock() {
187
188         logger.info("{}: locking", this);
189
190         synchronized (this) {
191             if (this.locked) {
192                 return true;
193             }
194
195             this.locked = true;
196         }
197
198         return this.stop();
199     }
200
201     @Override
202     public boolean unlock() {
203         logger.info("{}: unlocking", this);
204
205         synchronized (this) {
206             if (!this.locked) {
207                 return true;
208             }
209
210             this.locked = false;
211         }
212
213         try {
214             return this.start();
215         } catch (Exception e) {
216             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
217             return false;
218         }
219     }
220
221     @Override
222     public boolean isLocked() {
223         return this.locked;
224     }
225
226     @Override
227     public String getTopic() {
228         return topic;
229     }
230
231     @Override
232     public String getEffectiveTopic() {
233         return effectiveTopic;
234     }
235
236     @Override
237     public boolean isAlive() {
238         return this.alive;
239     }
240
241     @Override
242     public List<String> getServers() {
243         return servers;
244     }
245
246     @Override
247     public synchronized String[] getRecentEvents() {
248         var events = new String[recentEvents.size()];
249         return recentEvents.toArray(events);
250     }
251
252
253     @Override
254     public String toString() {
255         return "TopicBase [servers=" + servers
256             + ", topic=" + topic
257             + ", effectiveTopic=" + effectiveTopic
258             + ", #recentEvents=" + recentEvents.size()
259             + ", locked=" + locked
260             + ", #topicListeners=" + topicListeners.size()
261             + "]";
262     }
263 }