d3be9163b0a2370daae560bdedcbd621dc0114bc
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.apache.log4j.Logger;
28
29 import org.openecomp.policy.drools.event.comm.TopicListener;
30 import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
31 import org.openecomp.policy.common.logging.eelf.MessageCodes;
32 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
33
34 /**
35  * This topic source implementation specializes in reading messages
36  * over a bus topic source and notifying its listeners
37  */
38 public abstract class SingleThreadedBusTopicSource 
39        extends BusTopicBase
40        implements Runnable, BusTopicSource {
41            
42         private String className = SingleThreadedBusTopicSource.class.getName();
43         /**
44          * Not to be converted to PolicyLogger.
45          * This will contain all instract /out traffic and only that in a single file in a concise format.
46          */
47         protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
48         
49         /**
50          * Bus consumer group
51          */
52         protected final String consumerGroup;
53         
54         /**
55          * Bus consumer instance
56          */
57         protected final String consumerInstance;
58         
59         /**
60          * Bus fetch timeout
61          */
62         protected final int fetchTimeout;
63         
64         /**
65          * Bus fetch limit
66          */
67         protected final int fetchLimit;
68         
69         /**
70          * Message Bus Consumer
71          */
72         protected BusConsumer consumer;
73         
74         /**
75          * Am I running?
76          * reflects invocation of start()/stop() 
77          * !locked & start() => alive
78          * stop() => !alive
79          */
80         protected volatile boolean alive = false;
81         
82         /**
83          * Am I locked?
84          * reflects invocation of lock()/unlock() operations
85          * locked => !alive (but not in the other direction necessarily)
86          * locked => !offer, !run, !start, !stop (but this last one is obvious
87          *                                        since locked => !alive)
88          */
89         protected volatile boolean locked = false;
90         
91         /**
92          * Independent thread reading message over my topic
93          */
94         protected Thread busPollerThread;
95         
96         /**
97          * All my subscribers for new message notifications
98          */
99         protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
100         
101
102         /**
103          * 
104          * @param servers Bus servers
105          * @param topic Bus Topic to be monitored
106          * @param apiKey Bus API Key (optional)
107          * @param apiSecret Bus API Secret (optional)
108          * @param consumerGroup Bus Reader Consumer Group
109          * @param consumerInstance Bus Reader Instance
110          * @param fetchTimeout Bus fetch timeout
111          * @param fetchLimit Bus fetch limit
112          * @param useHttps does the bus use https
113          * @param allowSelfSignedCerts are self-signed certificates allowed
114          * @throws IllegalArgumentException An invalid parameter passed in
115          */
116         public SingleThreadedBusTopicSource(List<String> servers, 
117                                                                                 String topic, 
118                                                                 String apiKey, 
119                                                                 String apiSecret, 
120                                                                 String consumerGroup, 
121                                                                 String consumerInstance,
122                                                                 int fetchTimeout,
123                                                                 int fetchLimit,
124                                                                 boolean useHttps,
125                                                                 boolean allowSelfSignedCerts) 
126         throws IllegalArgumentException {
127                 
128                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
129                 
130                 if (consumerGroup == null || consumerGroup.isEmpty()) {
131                         this.consumerGroup = UUID.randomUUID ().toString();
132                 } else {
133                         this.consumerGroup = consumerGroup;
134                 }
135                 
136                 if (consumerInstance == null || consumerInstance.isEmpty()) {
137                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
138                 } else {
139                         this.consumerInstance = consumerInstance;
140                 }
141                 
142                 if (fetchTimeout <= 0) {
143                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
144                 } else {
145                         this.fetchTimeout = fetchTimeout;
146                 }
147                 
148                 if (fetchLimit <= 0) {
149                         this.fetchLimit = NO_LIMIT_FETCH;
150                 } else {
151                         this.fetchLimit = fetchLimit;
152                 }
153                 
154         }
155         
156         /**
157          * Initialize the Bus client
158          */
159         public abstract void init() throws Exception;
160         
161         /**
162          * {@inheritDoc}
163          */
164         @Override
165         public void register(TopicListener topicListener) 
166                 throws IllegalArgumentException {               
167                 
168                 PolicyLogger.info(className,"REGISTER: " + topicListener + " INTO " + this);
169                 
170                 synchronized(this) {
171                         if (topicListener == null)
172                                 throw new IllegalArgumentException("TopicListener must be provided");
173                         
174                         /* check that this listener is not registered already */
175                         for (TopicListener listener: this.topicListeners) {
176                                 if (listener == topicListener) {
177                                         // already registered
178                                         return;
179                                 }
180                         }
181                         
182                         this.topicListeners.add(topicListener);
183                 }
184                 
185                 try {
186                         this.start();
187                 } catch (Exception e) {
188                         PolicyLogger.info(className, "new registration of " + topicListener +  
189                                                   ",but can't start source because of " + e.getMessage());
190                 }
191         }
192
193         /**
194          * {@inheritDoc}
195          */
196         @Override
197         public void unregister(TopicListener topicListener) {
198                 
199                 PolicyLogger.info(className, "UNREGISTER: " + topicListener + " FROM " + this);
200                 
201                 boolean stop = false;
202                 synchronized (this) {
203                         if (topicListener == null)
204                                 throw new IllegalArgumentException("TopicListener must be provided");
205                         
206                         this.topicListeners.remove(topicListener);
207                         stop = (this.topicListeners.isEmpty());
208                 }
209                 
210                 if (stop) {             
211                         this.stop();
212                 }
213         }
214         
215         /**
216          * {@inheritDoc}
217          */
218         @Override
219         public boolean lock() { 
220                 PolicyLogger.info(className, "LOCK: " + this);
221                 
222                 synchronized (this) {
223                         if (this.locked)
224                                 return true;
225                         
226                         this.locked = true;
227                 }
228                 
229                 return this.stop();
230         }
231
232         /**
233          * {@inheritDoc}
234          */
235         @Override
236         public boolean unlock() {
237                 PolicyLogger.info(className, "UNLOCK: " + this);
238                 
239                 synchronized(this) {
240                         if (!this.locked)
241                                 return true;
242                         
243                         this.locked = false;
244                 }
245                 
246                 try {
247                         return this.start();
248                 } catch (Exception e) {
249                         PolicyLogger.warn("can't start after unlocking " + this + 
250                                                   " because of " + e.getMessage());
251                         return false;
252                 }
253         }
254         
255         /**
256          * {@inheritDoc}
257          */
258         @Override
259         public boolean start() throws IllegalStateException {
260                 
261                 PolicyLogger.info(className, "START: " + this);
262                 
263                 synchronized(this) {
264                         
265                         if (alive) {
266                                 return true;
267                         }
268                         
269                         if (locked) {
270                                 throw new IllegalStateException(this + " is locked.");
271                         }
272                         
273                         if (this.busPollerThread == null || 
274                                 !this.busPollerThread.isAlive() || 
275                                 this.consumer == null) {
276                                 
277                                 try {
278                                         this.init();
279                                         this.alive = true;
280                                         this.busPollerThread = new Thread(this);
281                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
282                                         busPollerThread.start();
283                                 } catch (Exception e) {
284                                         e.printStackTrace();
285                                         throw new IllegalStateException(e);
286                                 }
287                         }
288                 }
289                 
290                 return this.alive;
291         }
292
293         /**
294          * {@inheritDoc}
295          */
296         @Override
297         public boolean stop() {
298                 PolicyLogger.info(className, "STOP: " + this);
299                 
300                 synchronized(this) {
301                         BusConsumer consumerCopy = this.consumer;
302                         
303                         this.alive = false;
304                         this.consumer = null;
305                         
306                         if (consumerCopy != null) {
307                                 try {
308                                         consumerCopy.close();
309                                 } catch (Exception e) {
310                                         PolicyLogger.warn(MessageCodes.EXCEPTION_ERROR, e, "CONSUMER.CLOSE", this.toString());
311                                 }
312                         }
313                 }
314                                                         
315                 Thread.yield();
316                                 
317                 return true;
318         }
319
320         /**
321          * {@inheritDoc}
322          */
323         @Override
324         public boolean isLocked() {
325                 return this.locked;
326         }
327         
328         /**
329          * broadcast event to all listeners
330          * 
331          * @param message the event
332          * @return true if all notifications are performed with no error, false otherwise
333          */
334         protected boolean broadcast(String message) {
335                 
336                 /* take a snapshot of listeners */
337                 List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
338                 
339                 boolean success = true;
340                 for (TopicListener topicListener: snapshotListeners) {
341                         try {
342                                 topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
343                         } catch (Exception e) {
344                                 PolicyLogger.warn(this.className, "ERROR notifying " + topicListener.toString() + 
345                                                           " because of " + e.getMessage() + " @ " + this.toString());
346                                 success = false;
347                         }
348                 }
349                 return success;
350         }
351         
352         /**
353          * take a snapshot of current topic listeners
354          * 
355          * @return the topic listeners
356          */
357         protected synchronized List<TopicListener> snapshotTopicListeners() {
358                 @SuppressWarnings("unchecked")
359                 List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
360                 return listeners;
361         }
362         
363         /**
364          * Run thread method for the Bus Reader
365          */
366         @Override
367         public void run() {
368                 while (this.alive) {
369                         try {
370                                 for (String event: this.consumer.fetch()) {                                     
371                                         synchronized (this) {
372                                                 this.recentEvents.add(event);
373                                         }
374                                         
375                                         if (networkLogger.isInfoEnabled()) {
376                                                 networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
377                                                                    this.topic + "]:" + 
378                                                                    event);
379                                         }
380                                         
381                                         PolicyLogger.info(className, this.topic + " <-- " + event);
382                                         broadcast(event);
383                                         
384                                         if (!this.alive)
385                                                 break;
386                                 }
387                         } catch (Exception e) {
388                                 PolicyLogger.error( MessageCodes.EXCEPTION_ERROR, className, e, "CONSUMER.FETCH", this.toString());
389                         }
390                 }
391                 
392                 PolicyLogger.warn(this.className, "Exiting: " + this);
393         }
394         
395         /**
396          * {@inheritDoc}
397          */
398         @Override
399         public boolean offer(String event) {
400                 PolicyLogger.info(className, "OFFER: " + event + " TO " + this);
401                 
402                 if (!this.alive) {
403                         throw new IllegalStateException(this + " is not alive.");
404                 }
405                 
406                 synchronized (this) {
407                         this.recentEvents.add(event);
408                 }
409                 
410                 if (networkLogger.isInfoEnabled()) {
411                         networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
412                                             this.topic + "]:" + 
413                                             event);
414                 }
415                 
416                 
417                 return broadcast(event);
418         }
419         
420
421         @Override
422         public String toString() {
423                 StringBuilder builder = new StringBuilder();
424                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
425                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
426                                 .append(", fetchLimit=").append(fetchLimit)
427                                 .append(", consumer=").append(this.consumer).append(", alive=")
428                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
429                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
430                                 .append("]");
431                 return builder.toString();
432         }
433
434         /**
435          * {@inheritDoc}
436          */
437         @Override
438         public boolean isAlive() {
439                 return alive;
440         }
441
442         /**
443          * {@inheritDoc}
444          */
445         @Override
446         public String getConsumerGroup() {
447                 return consumerGroup;
448         }
449
450         /**
451          * {@inheritDoc}
452          */
453         @Override
454         public String getConsumerInstance() {
455                 return consumerInstance;
456         }
457
458         /**
459          * {@inheritDoc}
460          */
461         @Override
462         public void shutdown() throws IllegalStateException {
463                 this.stop();
464                 this.topicListeners.clear();
465         }
466         
467         /**
468          * {@inheritDoc}
469          */
470         @Override
471         public int getFetchTimeout() {
472                 return fetchTimeout;
473         }
474
475         /**
476          * {@inheritDoc}
477          */
478         @Override
479         public int getFetchLimit() {
480                 return fetchLimit;
481         }
482
483 }