ed15ddf78d31bce41d4dcad3af5c621396204a77
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public abstract class TopicBase implements Topic {
33
34     /**
35      * logger
36      */
37     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
38
39     /**
40      * list of servers
41      */
42     protected List<String> servers;
43
44     /**
45      * Topic
46      */
47     protected String topic;
48
49     /**
50      * event cache
51      */
52     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
53
54     /**
55      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
56      * !alive
57      */
58     protected volatile boolean alive = false;
59
60     /**
61      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
62      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
63      * obvious since locked => !alive)
64      */
65     protected volatile boolean locked = false;
66
67     /**
68      * All my subscribers for new message notifications
69      */
70     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
71
72     /**
73      * Instantiates a new Topic Base
74      * 
75      * @param servers list of servers
76      * @param topic topic name
77      * 
78      * @return a Topic Base
79      * @throws IllegalArgumentException if invalid parameters are present
80      */
81     public TopicBase(List<String> servers, String topic) {
82
83         if (servers == null || servers.isEmpty()) {
84             throw new IllegalArgumentException("Server(s) must be provided");
85         }
86
87         if (topic == null || topic.isEmpty()) {
88             throw new IllegalArgumentException("A Topic must be provided");
89         }
90
91         this.servers = servers;
92         this.topic = topic;
93     }
94
95     @Override
96     public void register(TopicListener topicListener) {
97
98         logger.info("{}: registering {}", this, topicListener);
99
100         synchronized (this) {
101             if (topicListener == null) {
102                 throw new IllegalArgumentException("TopicListener must be provided");
103             }
104
105             for (TopicListener listener : this.topicListeners) {
106                 if (listener == topicListener) {
107                     return;
108                 }
109             }
110
111             this.topicListeners.add(topicListener);
112         }
113     }
114
115     @Override
116     public void unregister(TopicListener topicListener) {
117
118         logger.info("{}: unregistering {}", this, topicListener);
119
120         synchronized (this) {
121             if (topicListener == null) {
122                 throw new IllegalArgumentException("TopicListener must be provided");
123             }
124
125             this.topicListeners.remove(topicListener);
126         }
127     }
128
129     /**
130      * broadcast event to all listeners
131      * 
132      * @param message the event
133      * @return true if all notifications are performed with no error, false otherwise
134      */
135     protected boolean broadcast(String message) {
136         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
137
138         boolean success = true;
139         for (TopicListener topicListener : snapshotListeners) {
140             try {
141                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
142             } catch (Exception e) {
143                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
144                 success = false;
145             }
146         }
147         return success;
148     }
149
150     /**
151      * take a snapshot of current topic listeners
152      * 
153      * @return the topic listeners
154      */
155     protected synchronized List<TopicListener> snapshotTopicListeners() {
156         @SuppressWarnings("unchecked")
157         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
158         return listeners;
159     }
160
161     @Override
162     public boolean lock() {
163
164         logger.info("{}: locking", this);
165
166         synchronized (this) {
167             if (this.locked) {
168                 return true;
169             }
170
171             this.locked = true;
172         }
173
174         return this.stop();
175     }
176
177     @Override
178     public boolean unlock() {
179         logger.info("{}: unlocking", this);
180
181         synchronized (this) {
182             if (!this.locked) {
183                 return true;
184             }
185
186             this.locked = false;
187         }
188
189         try {
190             return this.start();
191         } catch (Exception e) {
192             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
193             return false;
194         }
195     }
196
197     @Override
198     public boolean isLocked() {
199         return this.locked;
200     }
201
202     @Override
203     public String getTopic() {
204         return topic;
205     }
206
207     @Override
208     public boolean isAlive() {
209         return this.alive;
210     }
211
212     @Override
213     public List<String> getServers() {
214         return servers;
215     }
216
217     @Override
218     public synchronized String[] getRecentEvents() {
219         String[] events = new String[recentEvents.size()];
220         return recentEvents.toArray(events);
221     }
222
223
224     @Override
225     public String toString() {
226         return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size()
227                 + ", locked=" + locked + ", #topicListeners=" + topicListeners.size() + "]";
228     }
229 }