Close old UEB/DMaaP consumer
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / SingleThreadedBusTopicSource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.UUID;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.onap.policy.drools.event.comm.FilterableTopicSource;
31 import org.onap.policy.drools.event.comm.TopicListener;
32 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
33 import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
34
35 /**
36  * This topic source implementation specializes in reading messages
37  * over a bus topic source and notifying its listeners
38  */
39 public abstract class SingleThreadedBusTopicSource 
40        extends BusTopicBase
41        implements Runnable, BusTopicSource, FilterableTopicSource {
42            
43         /**
44          * Not to be converted to PolicyLogger.
45          * This will contain all instract /out traffic and only that in a single file in a concise format.
46          */
47         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
48         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
49         
50         /**
51          * Bus consumer group
52          */
53         protected final String consumerGroup;
54         
55         /**
56          * Bus consumer instance
57          */
58         protected final String consumerInstance;
59         
60         /**
61          * Bus fetch timeout
62          */
63         protected final int fetchTimeout;
64         
65         /**
66          * Bus fetch limit
67          */
68         protected final int fetchLimit;
69         
70         /**
71          * Message Bus Consumer
72          */
73         protected BusConsumer consumer;
74         
75         /**
76          * Am I running?
77          * reflects invocation of start()/stop() 
78          * !locked & start() => alive
79          * stop() => !alive
80          */
81         protected volatile boolean alive = false;
82         
83         /**
84          * Independent thread reading message over my topic
85          */
86         protected Thread busPollerThread;
87         
88         /**
89          * All my subscribers for new message notifications
90          */
91         protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
92         
93
94         /**
95          * 
96          * @param servers Bus servers
97          * @param topic Bus Topic to be monitored
98          * @param apiKey Bus API Key (optional)
99          * @param apiSecret Bus API Secret (optional)
100          * @param consumerGroup Bus Reader Consumer Group
101          * @param consumerInstance Bus Reader Instance
102          * @param fetchTimeout Bus fetch timeout
103          * @param fetchLimit Bus fetch limit
104          * @param useHttps does the bus use https
105          * @param allowSelfSignedCerts are self-signed certificates allowed
106          * @throws IllegalArgumentException An invalid parameter passed in
107          */
108         public SingleThreadedBusTopicSource(List<String> servers, 
109                                                                                 String topic, 
110                                                                 String apiKey, 
111                                                                 String apiSecret, 
112                                                                 String consumerGroup, 
113                                                                 String consumerInstance,
114                                                                 int fetchTimeout,
115                                                                 int fetchLimit,
116                                                                 boolean useHttps,
117                                                                 boolean allowSelfSignedCerts) 
118         throws IllegalArgumentException {
119                 
120                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
121                 
122                 if (consumerGroup == null || consumerGroup.isEmpty()) {
123                         this.consumerGroup = UUID.randomUUID ().toString();
124                 } else {
125                         this.consumerGroup = consumerGroup;
126                 }
127                 
128                 if (consumerInstance == null || consumerInstance.isEmpty()) {
129                         this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
130                 } else {
131                         this.consumerInstance = consumerInstance;
132                 }
133                 
134                 if (fetchTimeout <= 0) {
135                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
136                 } else {
137                         this.fetchTimeout = fetchTimeout;
138                 }
139                 
140                 if (fetchLimit <= 0) {
141                         this.fetchLimit = NO_LIMIT_FETCH;
142                 } else {
143                         this.fetchLimit = fetchLimit;
144                 }
145                 
146         }
147         
148         /**
149          * Initialize the Bus client
150          */
151         public abstract void init() throws MalformedURLException;
152         
153         @Override
154         public void register(TopicListener topicListener) 
155                 throws IllegalArgumentException {               
156                 
157                 super.register(topicListener);
158                 
159                 try {
160                         if (!alive && !locked)
161                                 this.start();
162                         else
163                                 logger.info("{}: register: start not attempted", this);
164                 } catch (Exception e) {
165                         logger.warn("{}: cannot start after registration of because of: {}",
166                                 this, topicListener, e.getMessage(), e);
167                 }
168         }
169
170         @Override
171         public void unregister(TopicListener topicListener) {
172                 boolean stop;
173                 synchronized (this) {
174                         super.unregister(topicListener);
175                         stop = this.topicListeners.isEmpty();
176                 }
177                 
178                 if (stop) {             
179                         this.stop();
180                 }
181         }
182         
183         @Override
184         public boolean start() throws IllegalStateException {           
185                 logger.info("{}: starting", this);
186                 
187                 synchronized(this) {
188                         
189                         if (alive)
190                                 return true;
191                         
192                         if (locked)
193                                 throw new IllegalStateException(this + " is locked.");
194                         
195                         if (this.busPollerThread == null || 
196                                 !this.busPollerThread.isAlive() || 
197                                 this.consumer == null) {
198                                 
199                                 try {
200                                         this.init();
201                                         this.alive = true;
202                                         this.busPollerThread = new Thread(this);
203                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
204                                         busPollerThread.start();
205                                 } catch (Exception e) {
206                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
207                                         throw new IllegalStateException(e);
208                                 }
209                         }
210                 }
211                 
212                 return this.alive;
213         }
214
215         @Override
216         public boolean stop() {
217                 logger.info("{}: stopping", this);
218                 
219                 synchronized(this) {
220                         BusConsumer consumerCopy = this.consumer;
221                         
222                         this.alive = false;
223                         this.consumer = null;
224                         
225                         if (consumerCopy != null) {
226                                 try {
227                                         consumerCopy.close();
228                                 } catch (Exception e) {
229                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
230                                 }
231                         }
232                 }
233                                                         
234                 Thread.yield();
235                                 
236                 return true;
237         }
238         
239         /**
240          * Run thread method for the Bus Reader
241          */
242         @Override
243         public void run() {
244                 while (this.alive) {
245                         try {
246                                 for (String event: this.consumer.fetch()) {                                     
247                                         synchronized (this) {
248                                                 this.recentEvents.add(event);
249                                         }
250                                         
251                                         netLogger.info("[IN|{}|{}]{}{}",
252                                                            this.getTopicCommInfrastructure(), this.topic, 
253                                                            System.lineSeparator(), event);
254                                         
255                                         broadcast(event);
256                                         
257                                         if (!this.alive)
258                                                 break;
259                                 }
260                         } catch (Exception e) {
261                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
262                         }
263                 }
264                 
265                 logger.info("{}: exiting thread", this);
266         }
267         
268         /**
269          * {@inheritDoc}
270          */
271         @Override
272         public boolean offer(String event) {
273                 if (!this.alive) {
274                         throw new IllegalStateException(this + " is not alive.");
275                 }
276                 
277                 synchronized (this) {
278                         this.recentEvents.add(event);
279                 }
280                 
281                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
282                                        System.lineSeparator(), event);
283                 
284                 
285                 return broadcast(event);
286         }
287         
288
289         @Override
290     public void setFilter(String filter) {
291             if(consumer instanceof FilterableBusConsumer) {
292                 ((FilterableBusConsumer) consumer).setFilter(filter);
293                 
294             } else {
295                 throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
296             }
297     }
298
299     @Override
300         public String toString() {
301                 StringBuilder builder = new StringBuilder();
302                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
303                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
304                                 .append(", fetchLimit=").append(fetchLimit)
305                                 .append(", consumer=").append(this.consumer).append(", alive=")
306                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
307                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
308                                 .append("]");
309                 return builder.toString();
310         }
311
312         /**
313          * {@inheritDoc}
314          */
315         @Override
316         public boolean isAlive() {
317                 return alive;
318         }
319
320         /**
321          * {@inheritDoc}
322          */
323         @Override
324         public String getConsumerGroup() {
325                 return consumerGroup;
326         }
327
328         /**
329          * {@inheritDoc}
330          */
331         @Override
332         public String getConsumerInstance() {
333                 return consumerInstance;
334         }
335
336         /**
337          * {@inheritDoc}
338          */
339         @Override
340         public void shutdown() throws IllegalStateException {
341                 this.stop();
342                 this.topicListeners.clear();
343         }
344         
345         /**
346          * {@inheritDoc}
347          */
348         @Override
349         public int getFetchTimeout() {
350                 return fetchTimeout;
351         }
352
353         /**
354          * {@inheritDoc}
355          */
356         @Override
357         public int getFetchLimit() {
358                 return fetchLimit;
359         }
360
361 }