768046d0fec76cb6eedbdadcbfa903c5b07f45dc
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.UUID;
26 import org.onap.policy.drools.event.comm.FilterableTopicSource;
27 import org.onap.policy.drools.event.comm.TopicListener;
28 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
29 import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
30 import org.onap.policy.drools.utils.NetworkUtil;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This topic source implementation specializes in reading messages
36  * over a bus topic source and notifying its listeners
37  */
38 public abstract class SingleThreadedBusTopicSource 
39        extends BusTopicBase
40        implements Runnable, BusTopicSource, FilterableTopicSource {
41            
42         /**
43          * Not to be converted to PolicyLogger.
44          * This will contain all instract /out traffic and only that in a single file in a concise format.
45          */
46         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48         
49         /**
50          * Bus consumer group
51          */
52         protected final String consumerGroup;
53         
54         /**
55          * Bus consumer instance
56          */
57         protected final String consumerInstance;
58         
59         /**
60          * Bus fetch timeout
61          */
62         protected final int fetchTimeout;
63         
64         /**
65          * Bus fetch limit
66          */
67         protected final int fetchLimit;
68         
69         /**
70          * Message Bus Consumer
71          */
72         protected BusConsumer consumer;
73         
74         /**
75          * Independent thread reading message over my topic
76          */
77         protected Thread busPollerThread;
78         
79
80         /**
81          * 
82          * @param servers Bus servers
83          * @param topic Bus Topic to be monitored
84          * @param apiKey Bus API Key (optional)
85          * @param apiSecret Bus API Secret (optional)
86          * @param consumerGroup Bus Reader Consumer Group
87          * @param consumerInstance Bus Reader Instance
88          * @param fetchTimeout Bus fetch timeout
89          * @param fetchLimit Bus fetch limit
90          * @param useHttps does the bus use https
91          * @param allowSelfSignedCerts are self-signed certificates allowed
92          * @throws IllegalArgumentException An invalid parameter passed in
93          */
94         public SingleThreadedBusTopicSource(List<String> servers, 
95                                                                                 String topic, 
96                                                                 String apiKey, 
97                                                                 String apiSecret, 
98                                                                 String consumerGroup, 
99                                                                 String consumerInstance,
100                                                                 int fetchTimeout,
101                                                                 int fetchLimit,
102                                                                 boolean useHttps,
103                                                                 boolean allowSelfSignedCerts) {
104                 
105                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
106                 
107                 if (consumerGroup == null || consumerGroup.isEmpty()) {
108                         this.consumerGroup = UUID.randomUUID ().toString();
109                 } else {
110                         this.consumerGroup = consumerGroup;
111                 }
112                 
113                 if (consumerInstance == null || consumerInstance.isEmpty()) {
114             this.consumerInstance = NetworkUtil.getHostname();
115                 } else {
116                         this.consumerInstance = consumerInstance;
117                 }
118                 
119                 if (fetchTimeout <= 0) {
120                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
121                 } else {
122                         this.fetchTimeout = fetchTimeout;
123                 }
124                 
125                 if (fetchLimit <= 0) {
126                         this.fetchLimit = NO_LIMIT_FETCH;
127                 } else {
128                         this.fetchLimit = fetchLimit;
129                 }
130                 
131         }
132         
133         /**
134          * Initialize the Bus client
135          */
136         public abstract void init() throws MalformedURLException;
137         
138         @Override
139         public void register(TopicListener topicListener) {
140                 
141                 super.register(topicListener);
142                 
143                 try {
144                         if (!alive && !locked)
145                                 this.start();
146                         else
147                                 logger.info("{}: register: start not attempted", this);
148                 } catch (Exception e) {
149                         logger.warn("{}: cannot start after registration of because of: {}",
150                                 this, topicListener, e.getMessage(), e);
151                 }
152         }
153
154         @Override
155         public void unregister(TopicListener topicListener) {
156                 boolean stop;
157                 synchronized (this) {
158                         super.unregister(topicListener);
159                         stop = this.topicListeners.isEmpty();
160                 }
161                 
162                 if (stop) {             
163                         this.stop();
164                 }
165         }
166         
167         @Override
168         public boolean start() {
169                 logger.info("{}: starting", this);
170                 
171                 synchronized(this) {
172                         
173                         if (alive)
174                                 return true;
175                         
176                         if (locked)
177                                 throw new IllegalStateException(this + " is locked.");
178                         
179                         if (this.busPollerThread == null || 
180                                 !this.busPollerThread.isAlive() || 
181                                 this.consumer == null) {
182                                 
183                                 try {
184                                         this.init();
185                                         this.alive = true;
186                                         this.busPollerThread = new Thread(this);
187                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
188                                         busPollerThread.start();
189                                 } catch (Exception e) {
190                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
191                                         throw new IllegalStateException(e);
192                                 }
193                         }
194                 }
195                 
196                 return this.alive;
197         }
198
199         @Override
200         public boolean stop() {
201                 logger.info("{}: stopping", this);
202                 
203                 synchronized(this) {
204                         BusConsumer consumerCopy = this.consumer;
205                         
206                         this.alive = false;
207                         this.consumer = null;
208                         
209                         if (consumerCopy != null) {
210                                 try {
211                                         consumerCopy.close();
212                                 } catch (Exception e) {
213                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
214                                 }
215                         }
216                 }
217                                                         
218                 Thread.yield();
219                                 
220                 return true;
221         }
222         
223         /**
224          * Run thread method for the Bus Reader
225          */
226         @Override
227         public void run() {
228                 while (this.alive) {
229                         try {
230                                 for (String event: this.consumer.fetch()) {                                     
231                                         synchronized (this) {
232                                                 this.recentEvents.add(event);
233                                         }
234                                         
235                                         netLogger.info("[IN|{}|{}]{}{}",
236                                                            this.getTopicCommInfrastructure(), this.topic, 
237                                                            System.lineSeparator(), event);
238                                         
239                                         broadcast(event);
240                                         
241                                         if (!this.alive)
242                                                 break;
243                                 }
244                         } catch (Exception e) {
245                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
246                         }
247                 }
248                 
249                 logger.info("{}: exiting thread", this);
250         }
251         
252         /**
253          * {@inheritDoc}
254          */
255         @Override
256         public boolean offer(String event) {
257                 if (!this.alive) {
258                         throw new IllegalStateException(this + " is not alive.");
259                 }
260                 
261                 synchronized (this) {
262                         this.recentEvents.add(event);
263                 }
264                 
265                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
266                                        System.lineSeparator(), event);
267                 
268                 
269                 return broadcast(event);
270         }
271         
272
273         @Override
274     public void setFilter(String filter) {
275             if(consumer instanceof FilterableBusConsumer) {
276                 ((FilterableBusConsumer) consumer).setFilter(filter);
277                 
278             } else {
279                 throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
280             }
281     }
282
283     @Override
284         public String toString() {
285         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
286                         + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
287                         + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
288                         + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
289         }
290
291         /**
292          * {@inheritDoc}
293          */
294         @Override
295         public String getConsumerGroup() {
296                 return consumerGroup;
297         }
298
299         /**
300          * {@inheritDoc}
301          */
302         @Override
303         public String getConsumerInstance() {
304                 return consumerInstance;
305         }
306
307         /**
308          * {@inheritDoc}
309          */
310         @Override
311         public void shutdown() {
312                 this.stop();
313                 this.topicListeners.clear();
314         }
315         
316         /**
317          * {@inheritDoc}
318          */
319         @Override
320         public int getFetchTimeout() {
321                 return fetchTimeout;
322         }
323
324         /**
325          * {@inheritDoc}
326          */
327         @Override
328         public int getFetchLimit() {
329                 return fetchLimit;
330         }
331
332 }