85da3f01de85ccb2ee9aec4844de1a268046a349
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import org.openecomp.policy.drools.event.comm.TopicListener;
31 import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
32
33 /**
34  * This topic source implementation specializes in reading messages
35  * over a bus topic source and notifying its listeners
36  */
37 public abstract class SingleThreadedBusTopicSource 
38        extends BusTopicBase
39        implements Runnable, BusTopicSource {
40            
41         /**
42          * Not to be converted to PolicyLogger.
43          * This will contain all instract /out traffic and only that in a single file in a concise format.
44          */
45         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
46         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
47         
48         /**
49          * Bus consumer group
50          */
51         protected final String consumerGroup;
52         
53         /**
54          * Bus consumer instance
55          */
56         protected final String consumerInstance;
57         
58         /**
59          * Bus fetch timeout
60          */
61         protected final int fetchTimeout;
62         
63         /**
64          * Bus fetch limit
65          */
66         protected final int fetchLimit;
67         
68         /**
69          * Message Bus Consumer
70          */
71         protected BusConsumer consumer;
72         
73         /**
74          * Am I running?
75          * reflects invocation of start()/stop() 
76          * !locked & start() => alive
77          * stop() => !alive
78          */
79         protected volatile boolean alive = false;
80         
81         /**
82          * Am I locked?
83          * reflects invocation of lock()/unlock() operations
84          * locked => !alive (but not in the other direction necessarily)
85          * locked => !offer, !run, !start, !stop (but this last one is obvious
86          *                                        since locked => !alive)
87          */
88         protected volatile boolean locked = false;
89         
90         /**
91          * Independent thread reading message over my topic
92          */
93         protected Thread busPollerThread;
94         
95         /**
96          * All my subscribers for new message notifications
97          */
98         protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
99         
100
101         /**
102          * 
103          * @param servers Bus servers
104          * @param topic Bus Topic to be monitored
105          * @param apiKey Bus API Key (optional)
106          * @param apiSecret Bus API Secret (optional)
107          * @param consumerGroup Bus Reader Consumer Group
108          * @param consumerInstance Bus Reader Instance
109          * @param fetchTimeout Bus fetch timeout
110          * @param fetchLimit Bus fetch limit
111          * @param useHttps does the bus use https
112          * @param allowSelfSignedCerts are self-signed certificates allowed
113          * @throws IllegalArgumentException An invalid parameter passed in
114          */
115         public SingleThreadedBusTopicSource(List<String> servers, 
116                                                                                 String topic, 
117                                                                 String apiKey, 
118                                                                 String apiSecret, 
119                                                                 String consumerGroup, 
120                                                                 String consumerInstance,
121                                                                 int fetchTimeout,
122                                                                 int fetchLimit,
123                                                                 boolean useHttps,
124                                                                 boolean allowSelfSignedCerts) 
125         throws IllegalArgumentException {
126                 
127                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
128                 
129                 if (consumerGroup == null || consumerGroup.isEmpty()) {
130                         this.consumerGroup = UUID.randomUUID ().toString();
131                 } else {
132                         this.consumerGroup = consumerGroup;
133                 }
134                 
135                 if (consumerInstance == null || consumerInstance.isEmpty()) {
136                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
137                 } else {
138                         this.consumerInstance = consumerInstance;
139                 }
140                 
141                 if (fetchTimeout <= 0) {
142                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
143                 } else {
144                         this.fetchTimeout = fetchTimeout;
145                 }
146                 
147                 if (fetchLimit <= 0) {
148                         this.fetchLimit = NO_LIMIT_FETCH;
149                 } else {
150                         this.fetchLimit = fetchLimit;
151                 }
152                 
153         }
154         
155         /**
156          * Initialize the Bus client
157          */
158         public abstract void init() throws Exception;
159         
160         /**
161          * {@inheritDoc}
162          */
163         @Override
164         public void register(TopicListener topicListener) 
165                 throws IllegalArgumentException {               
166                 
167                 logger.info("{}: registering {}", this, topicListener);
168                 
169                 synchronized(this) {
170                         if (topicListener == null)
171                                 throw new IllegalArgumentException("TopicListener must be provided");
172                         
173                         /* check that this listener is not registered already */
174                         for (TopicListener listener: this.topicListeners) {
175                                 if (listener == topicListener) {
176                                         // already registered
177                                         return;
178                                 }
179                         }
180                         
181                         this.topicListeners.add(topicListener);
182                 }
183                 
184                 try {
185                         this.start();
186                 } catch (Exception e) {
187                         logger.warn("{}: cannot start after registration of because of: {}",
188                                 this, topicListener, e.getMessage(), e);
189                 }
190         }
191
192         /**
193          * {@inheritDoc}
194          */
195         @Override
196         public void unregister(TopicListener topicListener) {
197                 
198                 logger.info("{}: unregistering {}", this, topicListener);
199                 
200                 boolean stop = false;
201                 synchronized (this) {
202                         if (topicListener == null)
203                                 throw new IllegalArgumentException("TopicListener must be provided");
204                         
205                         this.topicListeners.remove(topicListener);
206                         stop = (this.topicListeners.isEmpty());
207                 }
208                 
209                 if (stop) {             
210                         this.stop();
211                 }
212         }
213         
214         /**
215          * {@inheritDoc}
216          */
217         @Override
218         public boolean lock() { 
219
220                 logger.info("{}: locking", this);
221                 
222                 synchronized (this) {
223                         if (this.locked)
224                                 return true;
225                         
226                         this.locked = true;
227                 }
228                 
229                 return this.stop();
230         }
231
232         /**
233          * {@inheritDoc}
234          */
235         @Override
236         public boolean unlock() {               
237                 logger.info("{}: unlocking", this);
238                 
239                 synchronized(this) {
240                         if (!this.locked)
241                                 return true;
242                         
243                         this.locked = false;
244                 }
245                 
246                 try {
247                         return this.start();
248                 } catch (Exception e) {
249                         logger.warn("{}: cannot after unlocking because of {}", this, e.getMessage(), e);
250                         return false;
251                 }
252         }
253         
254         /**
255          * {@inheritDoc}
256          */
257         @Override
258         public boolean start() throws IllegalStateException {           
259                 logger.info("{}: starting", this);
260                 
261                 synchronized(this) {
262                         
263                         if (alive)
264                                 return true;
265                         
266                         if (locked)
267                                 throw new IllegalStateException(this + " is locked.");
268                         
269                         if (this.busPollerThread == null || 
270                                 !this.busPollerThread.isAlive() || 
271                                 this.consumer == null) {
272                                 
273                                 try {
274                                         this.init();
275                                         this.alive = true;
276                                         this.busPollerThread = new Thread(this);
277                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
278                                         busPollerThread.start();
279                                 } catch (Exception e) {
280                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
281                                         throw new IllegalStateException(e);
282                                 }
283                         }
284                 }
285                 
286                 return this.alive;
287         }
288
289         /**
290          * {@inheritDoc}
291          */
292         @Override
293         public boolean stop() {
294                 logger.info("{}: stopping", this);
295                 
296                 synchronized(this) {
297                         BusConsumer consumerCopy = this.consumer;
298                         
299                         this.alive = false;
300                         this.consumer = null;
301                         
302                         if (consumerCopy != null) {
303                                 try {
304                                         consumerCopy.close();
305                                 } catch (Exception e) {
306                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
307                                 }
308                         }
309                 }
310                                                         
311                 Thread.yield();
312                                 
313                 return true;
314         }
315
316         /**
317          * {@inheritDoc}
318          */
319         @Override
320         public boolean isLocked() {
321                 return this.locked;
322         }
323         
324         /**
325          * broadcast event to all listeners
326          * 
327          * @param message the event
328          * @return true if all notifications are performed with no error, false otherwise
329          */
330         protected boolean broadcast(String message) {
331                 
332                 /* take a snapshot of listeners */
333                 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
334                 
335                 boolean success = true;
336                 for (TopicListener topicListener: snapshotListeners) {
337                         try {
338                                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
339                         } catch (Exception e) {
340                                 logger.warn("{}: notification error @ {} because of {}", 
341                                                     this, topicListener, e.getMessage(), e);
342                                 success = false;
343                         }
344                 }
345                 return success;
346         }
347         
348         /**
349          * take a snapshot of current topic listeners
350          * 
351          * @return the topic listeners
352          */
353         protected synchronized List<TopicListener> snapshotTopicListeners() {
354                 @SuppressWarnings("unchecked")
355                 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
356                 return listeners;
357         }
358         
359         /**
360          * Run thread method for the Bus Reader
361          */
362         @Override
363         public void run() {
364                 while (this.alive) {
365                         try {
366                                 for (String event: this.consumer.fetch()) {                                     
367                                         synchronized (this) {
368                                                 this.recentEvents.add(event);
369                                         }
370                                         
371                                         netLogger.info("[IN|{}|{}]{}{}",
372                                                            this.getTopicCommInfrastructure(), this.topic, 
373                                                            System.lineSeparator(), event);
374                                         
375                                         broadcast(event);
376                                         
377                                         if (!this.alive)
378                                                 break;
379                                 }
380                         } catch (Exception e) {
381                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
382                         }
383                 }
384                 
385                 logger.info("{}: exiting thread", this);
386         }
387         
388         /**
389          * {@inheritDoc}
390          */
391         @Override
392         public boolean offer(String event) {
393                 if (!this.alive) {
394                         throw new IllegalStateException(this + " is not alive.");
395                 }
396                 
397                 synchronized (this) {
398                         this.recentEvents.add(event);
399                 }
400                 
401                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
402                                        System.lineSeparator(), event);
403                 
404                 
405                 return broadcast(event);
406         }
407         
408
409         @Override
410         public String toString() {
411                 StringBuilder builder = new StringBuilder();
412                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
413                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
414                                 .append(", fetchLimit=").append(fetchLimit)
415                                 .append(", consumer=").append(this.consumer).append(", alive=")
416                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
417                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
418                                 .append("]");
419                 return builder.toString();
420         }
421
422         /**
423          * {@inheritDoc}
424          */
425         @Override
426         public boolean isAlive() {
427                 return alive;
428         }
429
430         /**
431          * {@inheritDoc}
432          */
433         @Override
434         public String getConsumerGroup() {
435                 return consumerGroup;
436         }
437
438         /**
439          * {@inheritDoc}
440          */
441         @Override
442         public String getConsumerInstance() {
443                 return consumerInstance;
444         }
445
446         /**
447          * {@inheritDoc}
448          */
449         @Override
450         public void shutdown() throws IllegalStateException {
451                 this.stop();
452                 this.topicListeners.clear();
453         }
454         
455         /**
456          * {@inheritDoc}
457          */
458         @Override
459         public int getFetchTimeout() {
460                 return fetchTimeout;
461         }
462
463         /**
464          * {@inheritDoc}
465          */
466         @Override
467         public int getFetchLimit() {
468                 return fetchLimit;
469         }
470
471 }