8066455459109e775c120a7a4fcadad26773bc6f
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public abstract class TopicBase implements Topic {
33
34     /**
35      * Logger.
36      */
37     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
38
39     /**
40      * List of servers.
41      */
42     protected List<String> servers;
43
44     /**
45      * Topic.
46      */
47     protected String topic;
48
49     /**
50      * Event cache.
51      */
52     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
53
54     /**
55      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
56      * !alive.
57      */
58     protected volatile boolean alive = false;
59
60     /**
61      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
62      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
63      * obvious since locked => !alive).
64      */
65     protected volatile boolean locked = false;
66
67     /**
68      * All my subscribers for new message notifications.
69      */
70     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
71
72     /**
73      * Instantiates a new Topic Base.
74      * 
75      * @param servers list of servers
76      * @param topic topic name
77      * 
78      * @throws IllegalArgumentException if invalid parameters are present
79      */
80     public TopicBase(List<String> servers, String topic) {
81
82         if (servers == null || servers.isEmpty()) {
83             throw new IllegalArgumentException("Server(s) must be provided");
84         }
85
86         if (topic == null || topic.isEmpty()) {
87             throw new IllegalArgumentException("A Topic must be provided");
88         }
89
90         this.servers = servers;
91         this.topic = topic;
92     }
93
94     @Override
95     public void register(TopicListener topicListener) {
96
97         logger.info("{}: registering {}", this, topicListener);
98
99         synchronized (this) {
100             if (topicListener == null) {
101                 throw new IllegalArgumentException("TopicListener must be provided");
102             }
103
104             for (TopicListener listener : this.topicListeners) {
105                 if (listener == topicListener) {
106                     return;
107                 }
108             }
109
110             this.topicListeners.add(topicListener);
111         }
112     }
113
114     @Override
115     public void unregister(TopicListener topicListener) {
116
117         logger.info("{}: unregistering {}", this, topicListener);
118
119         synchronized (this) {
120             if (topicListener == null) {
121                 throw new IllegalArgumentException("TopicListener must be provided");
122             }
123
124             this.topicListeners.remove(topicListener);
125         }
126     }
127
128     /**
129      * Broadcast event to all listeners.
130      * 
131      * @param message the event
132      * @return true if all notifications are performed with no error, false otherwise
133      */
134     protected boolean broadcast(String message) {
135         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
136
137         boolean success = true;
138         for (TopicListener topicListener : snapshotListeners) {
139             try {
140                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
141             } catch (Exception e) {
142                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
143                 success = false;
144             }
145         }
146         return success;
147     }
148
149     /**
150      * Take a snapshot of current topic listeners.
151      * 
152      * @return the topic listeners
153      */
154     protected synchronized List<TopicListener> snapshotTopicListeners() {
155         @SuppressWarnings("unchecked")
156         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
157         return listeners;
158     }
159
160     @Override
161     public boolean lock() {
162
163         logger.info("{}: locking", this);
164
165         synchronized (this) {
166             if (this.locked) {
167                 return true;
168             }
169
170             this.locked = true;
171         }
172
173         return this.stop();
174     }
175
176     @Override
177     public boolean unlock() {
178         logger.info("{}: unlocking", this);
179
180         synchronized (this) {
181             if (!this.locked) {
182                 return true;
183             }
184
185             this.locked = false;
186         }
187
188         try {
189             return this.start();
190         } catch (Exception e) {
191             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
192             return false;
193         }
194     }
195
196     @Override
197     public boolean isLocked() {
198         return this.locked;
199     }
200
201     @Override
202     public String getTopic() {
203         return topic;
204     }
205
206     @Override
207     public boolean isAlive() {
208         return this.alive;
209     }
210
211     @Override
212     public List<String> getServers() {
213         return servers;
214     }
215
216     @Override
217     public synchronized String[] getRecentEvents() {
218         String[] events = new String[recentEvents.size()];
219         return recentEvents.toArray(events);
220     }
221
222
223     @Override
224     public String toString() {
225         return "TopicBase [servers=" + servers + ", topic=" + topic + ", #recentEvents=" + recentEvents.size()
226                 + ", locked=" + locked + ", #topicListeners=" + topicListeners.size() + "]";
227     }
228 }