2e0a9a4692a5a6b376c42e7b0bf42043f66ff07d
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import org.apache.commons.collections4.queue.CircularFifoQueue;
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.TopicListener;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 public abstract class TopicBase implements Topic {
32
33     /**
34      * Logger.
35      */
36     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
37
38     /**
39      * List of servers.
40      */
41     protected List<String> servers;
42
43     /**
44      * Topic.
45      */
46     protected final String topic;
47
48     /**
49      * Topic Alias.
50      */
51     protected final String effectiveTopic;
52
53     /**
54      * Event cache.
55      */
56     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
57
58     /**
59      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
60      * !alive.
61      */
62     protected volatile boolean alive = false;
63
64     /**
65      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
66      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
67      * obvious since locked => !alive).
68      */
69     protected volatile boolean locked = false;
70
71     /**
72      * All my subscribers for new message notifications.
73      */
74     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
75
76     /**
77      * Instantiates a new Topic Base.
78      *
79      * @param servers list of servers
80      * @param topic topic name
81      *
82      * @throws IllegalArgumentException if invalid parameters are present
83      */
84     public TopicBase(List<String> servers, String topic) {
85         this(servers, topic, topic);
86     }
87
88     /**
89      * Instantiates a new Topic Base.
90      *
91      * @param servers list of servers
92      * @param topic topic name
93      *
94      * @throws IllegalArgumentException if invalid parameters are present
95      */
96     public TopicBase(List<String> servers, String topic, String effectiveTopic) {
97
98         if (servers == null || servers.isEmpty()) {
99             throw new IllegalArgumentException("Server(s) must be provided");
100         }
101
102         if (topic == null || topic.isEmpty()) {
103             throw new IllegalArgumentException("A Topic must be provided");
104         }
105
106         String effectiveTopicCopy;
107         if (effectiveTopic == null || effectiveTopic.isEmpty()) {
108             effectiveTopicCopy = topic;
109         } else {
110             effectiveTopicCopy = effectiveTopic;
111         }
112
113         this.servers = servers;
114         this.topic = topic;
115         this.effectiveTopic = effectiveTopicCopy;
116     }
117
118     @Override
119     public void register(TopicListener topicListener) {
120
121         logger.info("{}: registering {}", this, topicListener);
122
123         synchronized (this) {
124             if (topicListener == null) {
125                 throw new IllegalArgumentException("TopicListener must be provided");
126             }
127
128             for (TopicListener listener : this.topicListeners) {
129                 if (listener == topicListener) {
130                     return;
131                 }
132             }
133
134             this.topicListeners.add(topicListener);
135         }
136     }
137
138     @Override
139     public void unregister(TopicListener topicListener) {
140
141         logger.info("{}: unregistering {}", this, topicListener);
142
143         synchronized (this) {
144             if (topicListener == null) {
145                 throw new IllegalArgumentException("TopicListener must be provided");
146             }
147
148             this.topicListeners.remove(topicListener);
149         }
150     }
151
152     /**
153      * Broadcast event to all listeners.
154      *
155      * @param message the event
156      * @return true if all notifications are performed with no error, false otherwise
157      */
158     protected boolean broadcast(String message) {
159         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
160
161         boolean success = true;
162         for (TopicListener topicListener : snapshotListeners) {
163             try {
164                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
165             } catch (Exception e) {
166                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
167                 success = false;
168             }
169         }
170         return success;
171     }
172
173     /**
174      * Take a snapshot of current topic listeners.
175      *
176      * @return the topic listeners
177      */
178     protected synchronized List<TopicListener> snapshotTopicListeners() {
179         @SuppressWarnings("unchecked")
180         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
181         return listeners;
182     }
183
184     @Override
185     public boolean lock() {
186
187         logger.info("{}: locking", this);
188
189         synchronized (this) {
190             if (this.locked) {
191                 return true;
192             }
193
194             this.locked = true;
195         }
196
197         return this.stop();
198     }
199
200     @Override
201     public boolean unlock() {
202         logger.info("{}: unlocking", this);
203
204         synchronized (this) {
205             if (!this.locked) {
206                 return true;
207             }
208
209             this.locked = false;
210         }
211
212         try {
213             return this.start();
214         } catch (Exception e) {
215             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
216             return false;
217         }
218     }
219
220     @Override
221     public boolean isLocked() {
222         return this.locked;
223     }
224
225     @Override
226     public String getTopic() {
227         return topic;
228     }
229
230     @Override
231     public String getEffectiveTopic() {
232         return effectiveTopic;
233     }
234
235     @Override
236     public boolean isAlive() {
237         return this.alive;
238     }
239
240     @Override
241     public List<String> getServers() {
242         return servers;
243     }
244
245     @Override
246     public synchronized String[] getRecentEvents() {
247         String[] events = new String[recentEvents.size()];
248         return recentEvents.toArray(events);
249     }
250
251
252     @Override
253     public String toString() {
254         return "TopicBase [servers=" + servers
255             + ", topic=" + topic
256             + ", effectiveTopic=" + effectiveTopic
257             + ", #recentEvents=" + recentEvents.size()
258             + ", locked=" + locked
259             + ", #topicListeners=" + topicListeners.size()
260             + "]";
261     }
262 }