b7df8ca3679a853a1e12bbd4066ad0674fa30135
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.UUID;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import org.onap.policy.drools.event.comm.TopicListener;
32 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
33
34 /**
35  * This topic source implementation specializes in reading messages
36  * over a bus topic source and notifying its listeners
37  */
38 public abstract class SingleThreadedBusTopicSource 
39        extends BusTopicBase
40        implements Runnable, BusTopicSource {
41            
42         /**
43          * Not to be converted to PolicyLogger.
44          * This will contain all instract /out traffic and only that in a single file in a concise format.
45          */
46         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48         
49         /**
50          * Bus consumer group
51          */
52         protected final String consumerGroup;
53         
54         /**
55          * Bus consumer instance
56          */
57         protected final String consumerInstance;
58         
59         /**
60          * Bus fetch timeout
61          */
62         protected final int fetchTimeout;
63         
64         /**
65          * Bus fetch limit
66          */
67         protected final int fetchLimit;
68         
69         /**
70          * Message Bus Consumer
71          */
72         protected BusConsumer consumer;
73         
74         /**
75          * Am I running?
76          * reflects invocation of start()/stop() 
77          * !locked & start() => alive
78          * stop() => !alive
79          */
80         protected volatile boolean alive = false;
81         
82         /**
83          * Independent thread reading message over my topic
84          */
85         protected Thread busPollerThread;
86         
87         /**
88          * All my subscribers for new message notifications
89          */
90         protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
91         
92
93         /**
94          * 
95          * @param servers Bus servers
96          * @param topic Bus Topic to be monitored
97          * @param apiKey Bus API Key (optional)
98          * @param apiSecret Bus API Secret (optional)
99          * @param consumerGroup Bus Reader Consumer Group
100          * @param consumerInstance Bus Reader Instance
101          * @param fetchTimeout Bus fetch timeout
102          * @param fetchLimit Bus fetch limit
103          * @param useHttps does the bus use https
104          * @param allowSelfSignedCerts are self-signed certificates allowed
105          * @throws IllegalArgumentException An invalid parameter passed in
106          */
107         public SingleThreadedBusTopicSource(List<String> servers, 
108                                                                                 String topic, 
109                                                                 String apiKey, 
110                                                                 String apiSecret, 
111                                                                 String consumerGroup, 
112                                                                 String consumerInstance,
113                                                                 int fetchTimeout,
114                                                                 int fetchLimit,
115                                                                 boolean useHttps,
116                                                                 boolean allowSelfSignedCerts) 
117         throws IllegalArgumentException {
118                 
119                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
120                 
121                 if (consumerGroup == null || consumerGroup.isEmpty()) {
122                         this.consumerGroup = UUID.randomUUID ().toString();
123                 } else {
124                         this.consumerGroup = consumerGroup;
125                 }
126                 
127                 if (consumerInstance == null || consumerInstance.isEmpty()) {
128                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
129                 } else {
130                         this.consumerInstance = consumerInstance;
131                 }
132                 
133                 if (fetchTimeout <= 0) {
134                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
135                 } else {
136                         this.fetchTimeout = fetchTimeout;
137                 }
138                 
139                 if (fetchLimit <= 0) {
140                         this.fetchLimit = NO_LIMIT_FETCH;
141                 } else {
142                         this.fetchLimit = fetchLimit;
143                 }
144                 
145         }
146         
147         /**
148          * Initialize the Bus client
149          */
150         public abstract void init() throws MalformedURLException;
151         
152         @Override
153         public void register(TopicListener topicListener) 
154                 throws IllegalArgumentException {               
155                 
156                 super.register(topicListener);
157                 
158                 try {
159                         if (!alive && !locked)
160                                 this.start();
161                         else
162                                 logger.info("{}: register: start not attempted", this);
163                 } catch (Exception e) {
164                         logger.warn("{}: cannot start after registration of because of: {}",
165                                 this, topicListener, e.getMessage(), e);
166                 }
167         }
168
169         @Override
170         public void unregister(TopicListener topicListener) {
171                 boolean stop;
172                 synchronized (this) {
173                         super.unregister(topicListener);
174                         stop = this.topicListeners.isEmpty();
175                 }
176                 
177                 if (stop) {             
178                         this.stop();
179                 }
180         }
181         
182         @Override
183         public boolean start() throws IllegalStateException {           
184                 logger.info("{}: starting", this);
185                 
186                 synchronized(this) {
187                         
188                         if (alive)
189                                 return true;
190                         
191                         if (locked)
192                                 throw new IllegalStateException(this + " is locked.");
193                         
194                         if (this.busPollerThread == null || 
195                                 !this.busPollerThread.isAlive() || 
196                                 this.consumer == null) {
197                                 
198                                 try {
199                                         this.init();
200                                         this.alive = true;
201                                         this.busPollerThread = new Thread(this);
202                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
203                                         busPollerThread.start();
204                                 } catch (Exception e) {
205                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
206                                         throw new IllegalStateException(e);
207                                 }
208                         }
209                 }
210                 
211                 return this.alive;
212         }
213
214         @Override
215         public boolean stop() {
216                 logger.info("{}: stopping", this);
217                 
218                 synchronized(this) {
219                         BusConsumer consumerCopy = this.consumer;
220                         
221                         this.alive = false;
222                         this.consumer = null;
223                         
224                         if (consumerCopy != null) {
225                                 try {
226                                         consumerCopy.close();
227                                 } catch (Exception e) {
228                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
229                                 }
230                         }
231                 }
232                                                         
233                 Thread.yield();
234                                 
235                 return true;
236         }
237         
238         /**
239          * Run thread method for the Bus Reader
240          */
241         @Override
242         public void run() {
243                 while (this.alive) {
244                         try {
245                                 for (String event: this.consumer.fetch()) {                                     
246                                         synchronized (this) {
247                                                 this.recentEvents.add(event);
248                                         }
249                                         
250                                         netLogger.info("[IN|{}|{}]{}{}",
251                                                            this.getTopicCommInfrastructure(), this.topic, 
252                                                            System.lineSeparator(), event);
253                                         
254                                         broadcast(event);
255                                         
256                                         if (!this.alive)
257                                                 break;
258                                 }
259                         } catch (Exception e) {
260                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
261                         }
262                 }
263                 
264                 logger.info("{}: exiting thread", this);
265         }
266         
267         /**
268          * {@inheritDoc}
269          */
270         @Override
271         public boolean offer(String event) {
272                 if (!this.alive) {
273                         throw new IllegalStateException(this + " is not alive.");
274                 }
275                 
276                 synchronized (this) {
277                         this.recentEvents.add(event);
278                 }
279                 
280                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
281                                        System.lineSeparator(), event);
282                 
283                 
284                 return broadcast(event);
285         }
286         
287
288         @Override
289         public String toString() {
290                 StringBuilder builder = new StringBuilder();
291                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
292                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
293                                 .append(", fetchLimit=").append(fetchLimit)
294                                 .append(", consumer=").append(this.consumer).append(", alive=")
295                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
296                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
297                                 .append("]");
298                 return builder.toString();
299         }
300
301         /**
302          * {@inheritDoc}
303          */
304         @Override
305         public boolean isAlive() {
306                 return alive;
307         }
308
309         /**
310          * {@inheritDoc}
311          */
312         @Override
313         public String getConsumerGroup() {
314                 return consumerGroup;
315         }
316
317         /**
318          * {@inheritDoc}
319          */
320         @Override
321         public String getConsumerInstance() {
322                 return consumerInstance;
323         }
324
325         /**
326          * {@inheritDoc}
327          */
328         @Override
329         public void shutdown() throws IllegalStateException {
330                 this.stop();
331                 this.topicListeners.clear();
332         }
333         
334         /**
335          * {@inheritDoc}
336          */
337         @Override
338         public int getFetchTimeout() {
339                 return fetchTimeout;
340         }
341
342         /**
343          * {@inheritDoc}
344          */
345         @Override
346         public int getFetchLimit() {
347                 return fetchLimit;
348         }
349
350 }