0fa90b504c61651129a8d377e2a8df1fadaa5fab
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import org.openecomp.policy.drools.event.comm.TopicListener;
31 import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
32
33 /**
34  * This topic source implementation specializes in reading messages
35  * over a bus topic source and notifying its listeners
36  */
37 public abstract class SingleThreadedBusTopicSource 
38        extends BusTopicBase
39        implements Runnable, BusTopicSource {
40            
41         /**
42          * Not to be converted to PolicyLogger.
43          * This will contain all instract /out traffic and only that in a single file in a concise format.
44          */
45         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
46         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
47         
48         /**
49          * Bus consumer group
50          */
51         protected final String consumerGroup;
52         
53         /**
54          * Bus consumer instance
55          */
56         protected final String consumerInstance;
57         
58         /**
59          * Bus fetch timeout
60          */
61         protected final int fetchTimeout;
62         
63         /**
64          * Bus fetch limit
65          */
66         protected final int fetchLimit;
67         
68         /**
69          * Message Bus Consumer
70          */
71         protected BusConsumer consumer;
72         
73         /**
74          * Am I running?
75          * reflects invocation of start()/stop() 
76          * !locked & start() => alive
77          * stop() => !alive
78          */
79         protected volatile boolean alive = false;
80         
81         /**
82          * Independent thread reading message over my topic
83          */
84         protected Thread busPollerThread;
85         
86         /**
87          * All my subscribers for new message notifications
88          */
89         protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
90         
91
92         /**
93          * 
94          * @param servers Bus servers
95          * @param topic Bus Topic to be monitored
96          * @param apiKey Bus API Key (optional)
97          * @param apiSecret Bus API Secret (optional)
98          * @param consumerGroup Bus Reader Consumer Group
99          * @param consumerInstance Bus Reader Instance
100          * @param fetchTimeout Bus fetch timeout
101          * @param fetchLimit Bus fetch limit
102          * @param useHttps does the bus use https
103          * @param allowSelfSignedCerts are self-signed certificates allowed
104          * @throws IllegalArgumentException An invalid parameter passed in
105          */
106         public SingleThreadedBusTopicSource(List<String> servers, 
107                                                                                 String topic, 
108                                                                 String apiKey, 
109                                                                 String apiSecret, 
110                                                                 String consumerGroup, 
111                                                                 String consumerInstance,
112                                                                 int fetchTimeout,
113                                                                 int fetchLimit,
114                                                                 boolean useHttps,
115                                                                 boolean allowSelfSignedCerts) 
116         throws IllegalArgumentException {
117                 
118                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
119                 
120                 if (consumerGroup == null || consumerGroup.isEmpty()) {
121                         this.consumerGroup = UUID.randomUUID ().toString();
122                 } else {
123                         this.consumerGroup = consumerGroup;
124                 }
125                 
126                 if (consumerInstance == null || consumerInstance.isEmpty()) {
127                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
128                 } else {
129                         this.consumerInstance = consumerInstance;
130                 }
131                 
132                 if (fetchTimeout <= 0) {
133                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
134                 } else {
135                         this.fetchTimeout = fetchTimeout;
136                 }
137                 
138                 if (fetchLimit <= 0) {
139                         this.fetchLimit = NO_LIMIT_FETCH;
140                 } else {
141                         this.fetchLimit = fetchLimit;
142                 }
143                 
144         }
145         
146         /**
147          * Initialize the Bus client
148          */
149         public abstract void init() throws Exception;
150         
151         @Override
152         public void register(TopicListener topicListener) 
153                 throws IllegalArgumentException {               
154                 
155                 super.register(topicListener);
156                 
157                 try {
158                         if (!alive && !locked)
159                                 this.start();
160                         else
161                                 logger.info("{}: register: start not attempted", this);
162                 } catch (Exception e) {
163                         logger.warn("{}: cannot start after registration of because of: {}",
164                                 this, topicListener, e.getMessage(), e);
165                 }
166         }
167
168         @Override
169         public void unregister(TopicListener topicListener) {
170                 boolean stop = false;
171                 synchronized (this) {
172                         super.unregister(topicListener);
173                         stop = (this.topicListeners.isEmpty());
174                 }
175                 
176                 if (stop) {             
177                         this.stop();
178                 }
179         }
180         
181         @Override
182         public boolean start() throws IllegalStateException {           
183                 logger.info("{}: starting", this);
184                 
185                 synchronized(this) {
186                         
187                         if (alive)
188                                 return true;
189                         
190                         if (locked)
191                                 throw new IllegalStateException(this + " is locked.");
192                         
193                         if (this.busPollerThread == null || 
194                                 !this.busPollerThread.isAlive() || 
195                                 this.consumer == null) {
196                                 
197                                 try {
198                                         this.init();
199                                         this.alive = true;
200                                         this.busPollerThread = new Thread(this);
201                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
202                                         busPollerThread.start();
203                                 } catch (Exception e) {
204                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
205                                         throw new IllegalStateException(e);
206                                 }
207                         }
208                 }
209                 
210                 return this.alive;
211         }
212
213         @Override
214         public boolean stop() {
215                 logger.info("{}: stopping", this);
216                 
217                 synchronized(this) {
218                         BusConsumer consumerCopy = this.consumer;
219                         
220                         this.alive = false;
221                         this.consumer = null;
222                         
223                         if (consumerCopy != null) {
224                                 try {
225                                         consumerCopy.close();
226                                 } catch (Exception e) {
227                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
228                                 }
229                         }
230                 }
231                                                         
232                 Thread.yield();
233                                 
234                 return true;
235         }
236         
237         /**
238          * Run thread method for the Bus Reader
239          */
240         @Override
241         public void run() {
242                 while (this.alive) {
243                         try {
244                                 for (String event: this.consumer.fetch()) {                                     
245                                         synchronized (this) {
246                                                 this.recentEvents.add(event);
247                                         }
248                                         
249                                         netLogger.info("[IN|{}|{}]{}{}",
250                                                            this.getTopicCommInfrastructure(), this.topic, 
251                                                            System.lineSeparator(), event);
252                                         
253                                         broadcast(event);
254                                         
255                                         if (!this.alive)
256                                                 break;
257                                 }
258                         } catch (Exception e) {
259                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
260                         }
261                 }
262                 
263                 logger.info("{}: exiting thread", this);
264         }
265         
266         /**
267          * {@inheritDoc}
268          */
269         @Override
270         public boolean offer(String event) {
271                 if (!this.alive) {
272                         throw new IllegalStateException(this + " is not alive.");
273                 }
274                 
275                 synchronized (this) {
276                         this.recentEvents.add(event);
277                 }
278                 
279                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
280                                        System.lineSeparator(), event);
281                 
282                 
283                 return broadcast(event);
284         }
285         
286
287         @Override
288         public String toString() {
289                 StringBuilder builder = new StringBuilder();
290                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
291                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
292                                 .append(", fetchLimit=").append(fetchLimit)
293                                 .append(", consumer=").append(this.consumer).append(", alive=")
294                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
295                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
296                                 .append("]");
297                 return builder.toString();
298         }
299
300         /**
301          * {@inheritDoc}
302          */
303         @Override
304         public boolean isAlive() {
305                 return alive;
306         }
307
308         /**
309          * {@inheritDoc}
310          */
311         @Override
312         public String getConsumerGroup() {
313                 return consumerGroup;
314         }
315
316         /**
317          * {@inheritDoc}
318          */
319         @Override
320         public String getConsumerInstance() {
321                 return consumerInstance;
322         }
323
324         /**
325          * {@inheritDoc}
326          */
327         @Override
328         public void shutdown() throws IllegalStateException {
329                 this.stop();
330                 this.topicListeners.clear();
331         }
332         
333         /**
334          * {@inheritDoc}
335          */
336         @Override
337         public int getFetchTimeout() {
338                 return fetchTimeout;
339         }
340
341         /**
342          * {@inheritDoc}
343          */
344         @Override
345         public int getFetchLimit() {
346                 return fetchLimit;
347         }
348
349 }