Set default consumer instance to hostname
[policy/drools-pdp.git] / policy-endpoints / src / main / java / org / onap / policy / drools / event / comm / bus / internal / SingleThreadedBusTopicSource.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.UUID;
27
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import org.onap.policy.drools.event.comm.FilterableTopicSource;
31 import org.onap.policy.drools.event.comm.TopicListener;
32 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
33 import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
34 import org.onap.policy.drools.utils.NetworkUtil;
35
36 /**
37  * This topic source implementation specializes in reading messages
38  * over a bus topic source and notifying its listeners
39  */
40 public abstract class SingleThreadedBusTopicSource 
41        extends BusTopicBase
42        implements Runnable, BusTopicSource, FilterableTopicSource {
43            
44         /**
45          * Not to be converted to PolicyLogger.
46          * This will contain all instract /out traffic and only that in a single file in a concise format.
47          */
48         private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
49         private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
50         
51         /**
52          * Bus consumer group
53          */
54         protected final String consumerGroup;
55         
56         /**
57          * Bus consumer instance
58          */
59         protected final String consumerInstance;
60         
61         /**
62          * Bus fetch timeout
63          */
64         protected final int fetchTimeout;
65         
66         /**
67          * Bus fetch limit
68          */
69         protected final int fetchLimit;
70         
71         /**
72          * Message Bus Consumer
73          */
74         protected BusConsumer consumer;
75         
76         /**
77          * Am I running?
78          * reflects invocation of start()/stop() 
79          * !locked & start() => alive
80          * stop() => !alive
81          */
82         protected volatile boolean alive = false;
83         
84         /**
85          * Independent thread reading message over my topic
86          */
87         protected Thread busPollerThread;
88         
89         /**
90          * All my subscribers for new message notifications
91          */
92         protected final ArrayList<TopicListener> topicListeners = new ArrayList<>();
93         
94
95         /**
96          * 
97          * @param servers Bus servers
98          * @param topic Bus Topic to be monitored
99          * @param apiKey Bus API Key (optional)
100          * @param apiSecret Bus API Secret (optional)
101          * @param consumerGroup Bus Reader Consumer Group
102          * @param consumerInstance Bus Reader Instance
103          * @param fetchTimeout Bus fetch timeout
104          * @param fetchLimit Bus fetch limit
105          * @param useHttps does the bus use https
106          * @param allowSelfSignedCerts are self-signed certificates allowed
107          * @throws IllegalArgumentException An invalid parameter passed in
108          */
109         public SingleThreadedBusTopicSource(List<String> servers, 
110                                                                                 String topic, 
111                                                                 String apiKey, 
112                                                                 String apiSecret, 
113                                                                 String consumerGroup, 
114                                                                 String consumerInstance,
115                                                                 int fetchTimeout,
116                                                                 int fetchLimit,
117                                                                 boolean useHttps,
118                                                                 boolean allowSelfSignedCerts) 
119         throws IllegalArgumentException {
120                 
121                 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
122                 
123                 if (consumerGroup == null || consumerGroup.isEmpty()) {
124                         this.consumerGroup = UUID.randomUUID ().toString();
125                 } else {
126                         this.consumerGroup = consumerGroup;
127                 }
128                 
129                 if (consumerInstance == null || consumerInstance.isEmpty()) {
130             this.consumerInstance = NetworkUtil.getHostname();
131                 } else {
132                         this.consumerInstance = consumerInstance;
133                 }
134                 
135                 if (fetchTimeout <= 0) {
136                         this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
137                 } else {
138                         this.fetchTimeout = fetchTimeout;
139                 }
140                 
141                 if (fetchLimit <= 0) {
142                         this.fetchLimit = NO_LIMIT_FETCH;
143                 } else {
144                         this.fetchLimit = fetchLimit;
145                 }
146                 
147         }
148         
149         /**
150          * Initialize the Bus client
151          */
152         public abstract void init() throws MalformedURLException;
153         
154         @Override
155         public void register(TopicListener topicListener) 
156                 throws IllegalArgumentException {               
157                 
158                 super.register(topicListener);
159                 
160                 try {
161                         if (!alive && !locked)
162                                 this.start();
163                         else
164                                 logger.info("{}: register: start not attempted", this);
165                 } catch (Exception e) {
166                         logger.warn("{}: cannot start after registration of because of: {}",
167                                 this, topicListener, e.getMessage(), e);
168                 }
169         }
170
171         @Override
172         public void unregister(TopicListener topicListener) {
173                 boolean stop;
174                 synchronized (this) {
175                         super.unregister(topicListener);
176                         stop = this.topicListeners.isEmpty();
177                 }
178                 
179                 if (stop) {             
180                         this.stop();
181                 }
182         }
183         
184         @Override
185         public boolean start() throws IllegalStateException {           
186                 logger.info("{}: starting", this);
187                 
188                 synchronized(this) {
189                         
190                         if (alive)
191                                 return true;
192                         
193                         if (locked)
194                                 throw new IllegalStateException(this + " is locked.");
195                         
196                         if (this.busPollerThread == null || 
197                                 !this.busPollerThread.isAlive() || 
198                                 this.consumer == null) {
199                                 
200                                 try {
201                                         this.init();
202                                         this.alive = true;
203                                         this.busPollerThread = new Thread(this);
204                                         this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
205                                         busPollerThread.start();
206                                 } catch (Exception e) {
207                                         logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
208                                         throw new IllegalStateException(e);
209                                 }
210                         }
211                 }
212                 
213                 return this.alive;
214         }
215
216         @Override
217         public boolean stop() {
218                 logger.info("{}: stopping", this);
219                 
220                 synchronized(this) {
221                         BusConsumer consumerCopy = this.consumer;
222                         
223                         this.alive = false;
224                         this.consumer = null;
225                         
226                         if (consumerCopy != null) {
227                                 try {
228                                         consumerCopy.close();
229                                 } catch (Exception e) {
230                                         logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
231                                 }
232                         }
233                 }
234                                                         
235                 Thread.yield();
236                                 
237                 return true;
238         }
239         
240         /**
241          * Run thread method for the Bus Reader
242          */
243         @Override
244         public void run() {
245                 while (this.alive) {
246                         try {
247                                 for (String event: this.consumer.fetch()) {                                     
248                                         synchronized (this) {
249                                                 this.recentEvents.add(event);
250                                         }
251                                         
252                                         netLogger.info("[IN|{}|{}]{}{}",
253                                                            this.getTopicCommInfrastructure(), this.topic, 
254                                                            System.lineSeparator(), event);
255                                         
256                                         broadcast(event);
257                                         
258                                         if (!this.alive)
259                                                 break;
260                                 }
261                         } catch (Exception e) {
262                                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
263                         }
264                 }
265                 
266                 logger.info("{}: exiting thread", this);
267         }
268         
269         /**
270          * {@inheritDoc}
271          */
272         @Override
273         public boolean offer(String event) {
274                 if (!this.alive) {
275                         throw new IllegalStateException(this + " is not alive.");
276                 }
277                 
278                 synchronized (this) {
279                         this.recentEvents.add(event);
280                 }
281                 
282                 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic, 
283                                        System.lineSeparator(), event);
284                 
285                 
286                 return broadcast(event);
287         }
288         
289
290         @Override
291     public void setFilter(String filter) {
292             if(consumer instanceof FilterableBusConsumer) {
293                 ((FilterableBusConsumer) consumer).setFilter(filter);
294                 
295             } else {
296                 throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
297             }
298     }
299
300     @Override
301         public String toString() {
302                 StringBuilder builder = new StringBuilder();
303                 builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
304                                 .append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
305                                 .append(", fetchLimit=").append(fetchLimit)
306                                 .append(", consumer=").append(this.consumer).append(", alive=")
307                                 .append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
308                                 .append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
309                                 .append("]");
310                 return builder.toString();
311         }
312
313         /**
314          * {@inheritDoc}
315          */
316         @Override
317         public boolean isAlive() {
318                 return alive;
319         }
320
321         /**
322          * {@inheritDoc}
323          */
324         @Override
325         public String getConsumerGroup() {
326                 return consumerGroup;
327         }
328
329         /**
330          * {@inheritDoc}
331          */
332         @Override
333         public String getConsumerInstance() {
334                 return consumerInstance;
335         }
336
337         /**
338          * {@inheritDoc}
339          */
340         @Override
341         public void shutdown() throws IllegalStateException {
342                 this.stop();
343                 this.topicListeners.clear();
344         }
345         
346         /**
347          * {@inheritDoc}
348          */
349         @Override
350         public int getFetchTimeout() {
351                 return fetchTimeout;
352         }
353
354         /**
355          * {@inheritDoc}
356          */
357         @Override
358         public int getFetchLimit() {
359                 return fetchLimit;
360         }
361
362 }