c63fbcc2114fc24de449e96156a99eacc0453d32
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
7  * Modifications Copyright (C) 2023 Nordix Foundation.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import java.util.ArrayList;
26 import java.util.List;
27 import lombok.AccessLevel;
28 import lombok.Getter;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 @Getter
36 public abstract class TopicBase implements Topic {
37
38     /**
39      * Logger.
40      */
41     private static final Logger logger = LoggerFactory.getLogger(TopicBase.class);
42
43     /**
44      * List of servers.
45      */
46     protected List<String> servers;
47
48     /**
49      * Topic.
50      */
51     protected final String topic;
52
53     /**
54      * Topic Alias.
55      */
56     protected final String effectiveTopic;
57
58     /**
59      * Event cache.
60      */
61     protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<>(10);
62
63     /**
64      * Am I running? reflects invocation of start()/stop() !locked & start() => alive stop() =>
65      * !alive.
66      */
67     protected volatile boolean alive = false;
68
69     /**
70      * Am I locked? reflects invocation of lock()/unlock() operations locked => !alive (but not in
71      * the other direction necessarily) locked => !offer, !run, !start, !stop (but this last one is
72      * obvious since locked => !alive).
73      */
74     protected volatile boolean locked = false;
75
76     /**
77      * All my subscribers for new message notifications.
78      */
79     @Getter(AccessLevel.NONE)
80     protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
81
82     /**
83      * Instantiates a new Topic Base.
84      *
85      * @param servers list of servers
86      * @param topic topic name
87      *
88      * @throws IllegalArgumentException if invalid parameters are present
89      */
90     protected TopicBase(List<String> servers, String topic) {
91         this(servers, topic, topic);
92     }
93
94     /**
95      * Instantiates a new Topic Base.
96      *
97      * @param servers list of servers
98      * @param topic topic name
99      *
100      * @throws IllegalArgumentException if invalid parameters are present
101      */
102     protected TopicBase(List<String> servers, String topic, String effectiveTopic) {
103
104         if (servers == null || servers.isEmpty()) {
105             throw new IllegalArgumentException("Server(s) must be provided");
106         }
107
108         if (topic == null || topic.isEmpty()) {
109             throw new IllegalArgumentException("A Topic must be provided");
110         }
111
112         String effectiveTopicCopy;
113         if (effectiveTopic == null || effectiveTopic.isEmpty()) {
114             effectiveTopicCopy = topic;
115         } else {
116             effectiveTopicCopy = effectiveTopic;
117         }
118
119         this.servers = servers;
120         this.topic = topic.toLowerCase();
121         this.effectiveTopic = effectiveTopicCopy.toLowerCase();
122     }
123
124     @Override
125     public void register(TopicListener topicListener) {
126
127         logger.info("{}: registering {}", this, topicListener);
128
129         synchronized (this) {
130             if (topicListener == null) {
131                 throw new IllegalArgumentException("TopicListener must be provided");
132             }
133
134             for (TopicListener listener : this.topicListeners) {
135                 if (listener == topicListener) {
136                     return;
137                 }
138             }
139
140             this.topicListeners.add(topicListener);
141         }
142     }
143
144     @Override
145     public void unregister(TopicListener topicListener) {
146
147         logger.info("{}: unregistering {}", this, topicListener);
148
149         synchronized (this) {
150             if (topicListener == null) {
151                 throw new IllegalArgumentException("TopicListener must be provided");
152             }
153
154             this.topicListeners.remove(topicListener);
155         }
156     }
157
158     /**
159      * Broadcast event to all listeners.
160      *
161      * @param message the event
162      * @return true if all notifications are performed with no error, false otherwise
163      */
164     protected boolean broadcast(String message) {
165         List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
166
167         var success = true;
168         for (TopicListener topicListener : snapshotListeners) {
169             try {
170                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
171             } catch (Exception e) {
172                 logger.warn("{}: notification error @ {} because of {}", this, topicListener, e.getMessage(), e);
173                 success = false;
174             }
175         }
176         return success;
177     }
178
179     /**
180      * Take a snapshot of current topic listeners.
181      *
182      * @return the topic listeners
183      */
184     protected synchronized List<TopicListener> snapshotTopicListeners() {
185         @SuppressWarnings("unchecked")
186         List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
187         return listeners;
188     }
189
190     @Override
191     public boolean lock() {
192
193         logger.info("{}: locking", this);
194
195         synchronized (this) {
196             if (this.locked) {
197                 return true;
198             }
199
200             this.locked = true;
201         }
202
203         return this.stop();
204     }
205
206     @Override
207     public boolean unlock() {
208         logger.info("{}: unlocking", this);
209
210         synchronized (this) {
211             if (!this.locked) {
212                 return true;
213             }
214
215             this.locked = false;
216         }
217
218         try {
219             return this.start();
220         } catch (Exception e) {
221             logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
222             return false;
223         }
224     }
225
226     @Override
227     public synchronized String[] getRecentEvents() {
228         var events = new String[recentEvents.size()];
229         return recentEvents.toArray(events);
230     }
231
232
233     @Override
234     public String toString() {
235         return "TopicBase [servers=" + servers
236             + ", topic=" + topic
237             + ", effectiveTopic=" + effectiveTopic
238             + ", #recentEvents=" + recentEvents.size()
239             + ", locked=" + locked
240             + ", #topicListeners=" + topicListeners.size()
241             + "]";
242     }
243 }