f37c349efbddacf25a35046878bc22ccf43e4e19
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.apache.log4j.Logger;
28
29 import org.openecomp.policy.drools.event.comm.TopicListener;
30 import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
31 import org.openecomp.policy.common.logging.eelf.MessageCodes;
32 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
33
34 /**
35  * This topic source implementation specializes in reading messages
36  * over a bus topic source and notifying its listeners
37  */
38 public abstract class SingleThreadedBusTopicSource 
39        extends BusTopicBase
40        implements Runnable, BusTopicSource {
41            
42         private String className = SingleThreadedBusTopicSource.class.getName();
43         /**
44          * Not to be converted to PolicyLogger.
45          * This will contain all instract /out traffic and only that in a single file in a concise format.
46          */
47         protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
48         
49         /**
50          * Bus consumer group
51          */
52         protected final String consumerGroup;
53         
54         /**
55          * Bus consumer instance
56          */
57         protected final String consumerInstance;
58         
59         /**
60          * Bus fetch timeout
61          */
62         protected final int fetchTimeout;
63         
64         /**
65          * Bus fetch limit
66          */
67         protected final int fetchLimit;
68         
69         /**
70          * Message Bus Consumer
71          */
72         protected BusConsumer consumer;
73         
74         /**
75          * Am I running?
76          * reflects invocation of start()/stop() 
77          * !locked & start() => alive
78          * stop() => !alive
79          */
80         protected volatile boolean alive = false;
81         
82         /**
83          * Am I locked?
84          * reflects invocation of lock()/unlock() operations
85          * locked => !alive (but not in the other direction necessarily)
86          * locked => !offer, !run, !start, !stop (but this last one is obvious
87          *                                        since locked => !alive)
88          */
89         protected volatile boolean locked = false;
90         
91         /**
92          * Independent thread reading message over my topic
93          */
94         protected Thread busPollerThread;
95         
96         /**
97          * All my subscribers for new message notifications
98          */
99         protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
100         
101         /**
102          * 
103          * @param servers Bus servers
104          * @param topic Bus Topic to be monitored
105          * @param apiKey Bus API Key (optional)
106          * @param apiSecret Bus API Secret (optional)
107          * @param consumerGroup Bus Reader Consumer Group
108          * @param consumerInstance Bus Reader Instance
109          * @param fetchTimeout Bus fetch timeout
110          * @param fetchLimit Bus fetch limit
111          * @throws IllegalArgumentException An invalid parameter passed in
112          */
113         public SingleThreadedBusTopicSource(List<String> servers, 
114                                                                                 String topic, 
115                                                                 String apiKey, 
116                                                                 String apiSecret, 
117                                                                 String consumerGroup, 
118                                                                 String consumerInstance,
119                                                                 int fetchTimeout,
120                                                                 int fetchLimit) 
121         throws IllegalArgumentException {
122                 
123                 super(servers, topic, apiKey, apiSecret);
124                 
125                 if (consumerGroup == null || consumerGroup.isEmpty()) {
126                         this.consumerGroup = UUID.randomUUID ().toString();
127                 } else {
128                         this.consumerGroup = consumerGroup;
129                 }
130                 
131                 if (consumerInstance == null || consumerInstance.isEmpty()) {
132                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
133                 } else {
134                         this.consumerInstance = consumerInstance;
135                 }
136                 
137                 if (fetchTimeout <= 0) {
138                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
139                 } else {
140                         this.fetchTimeout = fetchTimeout;
141                 }
142                 
143                 if (fetchLimit <= 0) {
144                         this.fetchLimit = NO_LIMIT_FETCH;
145                 } else {
146                         this.fetchLimit = fetchLimit;
147                 }
148         }
149         
150         /**
151          * Initialize the Bus client
152          */
153         public abstract void init() throws Exception;
154         
155         /**
156          * {@inheritDoc}
157          */
158         @Override
159         public void register(TopicListener topicListener) 
160                 throws IllegalArgumentException {               
161                 
162                 PolicyLogger.info(className,"REGISTER: " + topicListener + " INTO " + this);
163                 
164                 synchronized(this) {
165                         if (topicListener == null)
166                                 throw new IllegalArgumentException("TopicListener must be provided");
167                         
168                         /* check that this listener is not registered already */
169                         for (TopicListener listener: this.topicListeners) {
170                                 if (listener == topicListener) {
171                                         // already registered
172                                         return;
173                                 }
174                         }
175                         
176                         this.topicListeners.add(topicListener);
177                 }
178                 
179                 try {
180                         this.start();
181                 } catch (Exception e) {
182                         PolicyLogger.info(className, "new registration of " + topicListener +  
183                                                   ",but can't start source because of " + e.getMessage());
184                 }
185         }
186
187         /**
188          * {@inheritDoc}
189          */
190         @Override
191         public void unregister(TopicListener topicListener) {
192                 
193                 PolicyLogger.info(className, "UNREGISTER: " + topicListener + " FROM " + this);
194                 
195                 boolean stop = false;
196                 synchronized (this) {
197                         if (topicListener == null)
198                                 throw new IllegalArgumentException("TopicListener must be provided");
199                         
200                         this.topicListeners.remove(topicListener);
201                         stop = (this.topicListeners.isEmpty());
202                 }
203                 
204                 if (stop) {             
205                         this.stop();
206                 }
207         }
208         
209         /**
210          * {@inheritDoc}
211          */
212         @Override
213         public boolean lock() { 
214                 PolicyLogger.info(className, "LOCK: " + this);
215                 
216                 synchronized (this) {
217                         if (this.locked)
218                                 return true;
219                         
220                         this.locked = true;
221                 }
222                 
223                 return this.stop();
224         }
225
226         /**
227          * {@inheritDoc}
228          */
229         @Override
230         public boolean unlock() {
231                 PolicyLogger.info(className, "UNLOCK: " + this);
232                 
233                 synchronized(this) {
234                         if (!this.locked)
235                                 return true;
236                         
237                         this.locked = false;
238                 }
239                 
240                 try {
241                         return this.start();
242                 } catch (Exception e) {
243                         PolicyLogger.warn("can't start after unlocking " + this + 
244                                                   " because of " + e.getMessage());
245                         return false;
246                 }
247         }
248         
249         /**
250          * {@inheritDoc}
251          */
252         @Override
253         public boolean start() throws IllegalStateException {
254                 
255                 PolicyLogger.info(className, "START: " + this);
256                 
257                 synchronized(this) {
258                         
259                         if (alive) {
260                                 return true;
261                         }
262                         
263                         if (locked) {
264                                 throw new IllegalStateException(this + " is locked.");
265                         }
266                         
267                         if (this.busPollerThread == null || 
268                                 !this.busPollerThread.isAlive() || 
269                                 this.consumer == null) {
270                                 
271                                 try {
272                                         this.init();
273                                         this.alive = true;
274                                         this.busPollerThread = new Thread(this);
275                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
276                                         busPollerThread.start();
277                                 } catch (Exception e) {
278                                         e.printStackTrace();
279                                         throw new IllegalStateException(e);
280                                 }
281                         }
282                 }
283                 
284                 return this.alive;
285         }
286
287         /**
288          * {@inheritDoc}
289          */
290         @Override
291         public boolean stop() {
292                 PolicyLogger.info(className, "STOP: " + this);
293                 
294                 synchronized(this) {
295                         BusConsumer consumerCopy = this.consumer;
296                         
297                         this.alive = false;
298                         this.consumer = null;
299                         
300                         if (consumerCopy != null) {
301                                 try {
302                                         consumerCopy.close();
303                                 } catch (Exception e) {
304                                         PolicyLogger.warn(MessageCodes.EXCEPTION_ERROR, e, "CONSUMER.CLOSE", this.toString());
305                                 }
306                         }
307                 }
308                                                         
309                 Thread.yield();
310                                 
311                 return true;
312         }
313
314         /**
315          * {@inheritDoc}
316          */
317         @Override
318         public boolean isLocked() {
319                 return this.locked;
320         }
321         
322         /**
323          * broadcast event to all listeners
324          * 
325          * @param message the event
326          * @return true if all notifications are performed with no error, false otherwise
327          */
328         protected boolean broadcast(String message) {
329                 
330                 /* take a snapshot of listeners */
331                 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
332                 
333                 boolean success = true;
334                 for (TopicListener topicListener: snapshotListeners) {
335                         try {
336                                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
337                         } catch (Exception e) {
338                                 PolicyLogger.warn(this.className, "ERROR notifying " + topicListener.toString() + 
339                                                           " because of " + e.getMessage() + " @ " + this.toString());
340                                 success = false;
341                         }
342                 }
343                 return success;
344         }
345         
346         /**
347          * take a snapshot of current topic listeners
348          * 
349          * @return the topic listeners
350          */
351         protected synchronized List<TopicListener> snapshotTopicListeners() {
352                 @SuppressWarnings("unchecked")
353                 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
354                 return listeners;
355         }
356         
357         /**
358          * Run thread method for the Bus Reader
359          */
360         @Override
361         public void run() {
362                 while (this.alive) {
363                         try {
364                                 for (String event: this.consumer.fetch()) {                                     
365                                         synchronized (this) {
366                                                 this.recentEvents.add(event);
367                                         }
368                                         
369                                         if (networkLogger.isInfoEnabled()) {
370                                                 networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
371                                                                    this.topic + "]:" + 
372                                                                    event);
373                                         }
374                                         
375                                         PolicyLogger.info(className, this.topic + " <-- " + event);
376                                         broadcast(event);
377                                         
378                                         if (!this.alive)
379                                                 break;
380                                 }
381                         } catch (Exception e) {
382                                 PolicyLogger.error( MessageCodes.EXCEPTION_ERROR, className, e, "CONSUMER.FETCH", this.toString());
383                         }
384                 }
385                 
386                 PolicyLogger.warn(this.className, "Exiting: " + this);
387         }
388         
389         /**
390          * {@inheritDoc}
391          */
392         @Override
393         public boolean offer(String event) {
394                 PolicyLogger.info(className, "OFFER: " + event + " TO " + this);
395                 
396                 if (!this.alive) {
397                         throw new IllegalStateException(this + " is not alive.");
398                 }
399                 
400                 synchronized (this) {
401                         this.recentEvents.add(event);
402                 }
403                 
404                 if (networkLogger.isInfoEnabled()) {
405                         networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
406                                             this.topic + "]:" + 
407                                             event);
408                 }
409                 
410                 
411                 return broadcast(event);
412         }
413         
414
415         @Override
416         public String toString() {
417                 StringBuilder builder = new StringBuilder();
418                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
419                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
420                                 .append(", fetchLimit=").append(fetchLimit)
421                                 .append(", consumer=").append(this.consumer).append(", alive=")
422                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
423                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
424                                 .append("]");
425                 return builder.toString();
426         }
427
428         /**
429          * {@inheritDoc}
430          */
431         @Override
432         public boolean isAlive() {
433                 return alive;
434         }
435
436         /**
437          * {@inheritDoc}
438          */
439         @Override
440         public String getConsumerGroup() {
441                 return consumerGroup;
442         }
443
444         /**
445          * {@inheritDoc}
446          */
447         @Override
448         public String getConsumerInstance() {
449                 return consumerInstance;
450         }
451
452         /**
453          * {@inheritDoc}
454          */
455         @Override
456         public void shutdown() throws IllegalStateException {
457                 this.stop();
458                 this.topicListeners.clear();
459         }
460         
461         /**
462          * {@inheritDoc}
463          */
464         @Override
465         public int getFetchTimeout() {
466                 return fetchTimeout;
467         }
468
469         /**
470          * {@inheritDoc}
471          */
472         @Override
473         public int getFetchLimit() {
474                 return fetchLimit;
475         }
476
477 }