daeaea134d99d6812e0047cc33b557cec576de64
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.util.ArrayList;
25 import java.util.List;
26 import lombok.AccessLevel;
27 import lombok.Getter;
28 import org.apache.commons.collections4.queue.CircularFifoQueue;
29 import org.onap.policy.common.endpoints.event.comm.Topic;
30 import org.onap.policy.common.endpoints.event.comm.TopicListener;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 @Getter
35 public abstract class TopicBase implements Topic {
36
37     /**
38      * Logger.
39      */
40     private static Logger logger = LoggerFactory.getLogger(TopicBase.class);
41
42     /**
43      * List of servers.
44      */
45     protected List<String> servers;
46
47     /**
48      * Topic.
49      */
50     protected final String topic;
51
52     /**
53      * Topic Alias.
54      */
55     protected final String effectiveTopic;
56
57     /**
58      * Event cache.
59      */
60     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
61
62     /**
63      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
64      * !alive.
65      */
66     protected volatile boolean alive = false;
67
68     /**
69      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
70      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
71      * obvious since locked => !alive).
72      */
73     protected volatile boolean locked = false;
74
75     /**
76      * All my subscribers for new message notifications.
77      */
78     @Getter(AccessLevel.NONE)
79     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
80
81     /**
82      * Instantiates a new Topic Base.
83      *
84      * @param servers list of servers
85      * @param topic topic name
86      *
87      * @throws IllegalArgumentException if invalid parameters are present
88      */
89     protected TopicBase(List<String> servers, String topic) {
90         this(servers, topic, topic);
91     }
92
93     /**
94      * Instantiates a new Topic Base.
95      *
96      * @param servers list of servers
97      * @param topic topic name
98      *
99      * @throws IllegalArgumentException if invalid parameters are present
100      */
101     protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
102
103         if (servers == null || servers.isEmpty()) {
104             throw new IllegalArgumentException("Server(s) must be provided");
105         }
106
107         if (topic == null || topic.isEmpty()) {
108             throw new IllegalArgumentException("A Topic must be provided");
109         }
110
111         String effectiveTopicCopy;
112         if (effectiveTopic == null || effectiveTopic.isEmpty()) {
113             effectiveTopicCopy = topic;
114         } else {
115             effectiveTopicCopy = effectiveTopic;
116         }
117
118         this.servers = servers;
119         this.topic = topic;
120         this.effectiveTopic = effectiveTopicCopy;
121     }
122
123     @Override
124     public void register(TopicListener topicListener) {
125
126         logger.info("{}: registering {}", this, topicListener);
127
128         synchronized (this) {
129             if (topicListener == null) {
130                 throw new IllegalArgumentException("TopicListener must be provided");
131             }
132
133             for (TopicListener listener : this.topicListeners) {
134                 if (listener == topicListener) {
135                     return;
136                 }
137             }
138
139             this.topicListeners.add(topicListener);
140         }
141     }
142
143     @Override
144     public void unregister(TopicListener topicListener) {
145
146         logger.info("{}: unregistering {}", this, topicListener);
147
148         synchronized (this) {
149             if (topicListener == null) {
150                 throw new IllegalArgumentException("TopicListener must be provided");
151             }
152
153             this.topicListeners.remove(topicListener);
154         }
155     }
156
157     /**
158      * Broadcast event to all listeners.
159      *
160      * @param message the event
161      * @return true if all notifications are performed with no error, false otherwise
162      */
163     protected boolean broadcast(String message) {
164         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
165
166         var success = true;
167         for (TopicListener topicListener : snapshotListeners) {
168             try {
169                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
170             } catch (Exception e) {
171                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
172                 success = false;
173             }
174         }
175         return success;
176     }
177
178     /**
179      * Take a snapshot of current topic listeners.
180      *
181      * @return the topic listeners
182      */
183     protected synchronized List<TopicListener> snapshotTopicListeners() {
184         @SuppressWarnings("unchecked")
185         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
186         return listeners;
187     }
188
189     @Override
190     public boolean lock() {
191
192         logger.info("{}: locking", this);
193
194         synchronized (this) {
195             if (this.locked) {
196                 return true;
197             }
198
199             this.locked = true;
200         }
201
202         return this.stop();
203     }
204
205     @Override
206     public boolean unlock() {
207         logger.info("{}: unlocking", this);
208
209         synchronized (this) {
210             if (!this.locked) {
211                 return true;
212             }
213
214             this.locked = false;
215         }
216
217         try {
218             return this.start();
219         } catch (Exception e) {
220             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
221             return false;
222         }
223     }
224
225     @Override
226     public synchronized String[] getRecentEvents() {
227         var events = new String[recentEvents.size()];
228         return recentEvents.toArray(events);
229     }
230
231
232     @Override
233     public String toString() {
234         return "TopicBase [servers=" + servers
235             + ", topic=" + topic
236             + ", effectiveTopic=" + effectiveTopic
237             + ", #recentEvents=" + recentEvents.size()
238             + ", locked=" + locked
239             + ", #topicListeners=" + topicListeners.size()
240             + "]";
241     }
242 }