f9dd4852d539fe92c48e3d39f0699ac131f3c15f
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This topic source implementation specializes in reading messages over a bus topic source and
37  * notifying its listeners.
38  */
39 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
40         implements Runnable, BusTopicSource, FilterableTopicSource {
41
42     /**
43      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
44      * that in a single file in a concise format.
45      */
46     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48
49     /**
50      * Bus consumer group.
51      */
52     protected final String consumerGroup;
53
54     /**
55      * Bus consumer instance.
56      */
57     protected final String consumerInstance;
58
59     /**
60      * Bus fetch timeout.
61      */
62     protected final int fetchTimeout;
63
64     /**
65      * Bus fetch limit.
66      */
67     protected final int fetchLimit;
68
69     /**
70      * Message Bus Consumer.
71      */
72     protected BusConsumer consumer;
73
74     /**
75      * Independent thread reading message over my topic.
76      */
77     protected Thread busPollerThread;
78
79
80     /**
81      * Constructor.
82      *
83      * @param busTopicParams topic parameters
84      * 
85      * @throws IllegalArgumentException An invalid parameter passed in
86      */
87     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
88
89         super(busTopicParams);
90
91         if (busTopicParams.isConsumerGroupNullOrEmpty()) {
92             this.consumerGroup = UUID.randomUUID().toString();
93         } else {
94             this.consumerGroup = busTopicParams.getConsumerGroup();
95         }
96
97         if (busTopicParams.isConsumerInstanceNullOrEmpty()) {
98             this.consumerInstance = NetworkUtil.getHostname();
99         } else {
100             this.consumerInstance = busTopicParams.getConsumerInstance();
101         }
102
103         if (busTopicParams.getFetchTimeout() <= 0) {
104             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
105         } else {
106             this.fetchTimeout = busTopicParams.getFetchTimeout();
107         }
108
109         if (busTopicParams.getFetchLimit() <= 0) {
110             this.fetchLimit = NO_LIMIT_FETCH;
111         } else {
112             this.fetchLimit = busTopicParams.getFetchLimit();
113         }
114
115     }
116
117     /**
118      * Initialize the Bus client.
119      */
120     public abstract void init() throws MalformedURLException;
121
122     @Override
123     public void register(TopicListener topicListener) {
124
125         super.register(topicListener);
126
127         try {
128             if (!alive && !locked) {
129                 this.start();
130             } else {
131                 logger.info("{}: register: start not attempted", this);
132             }
133         } catch (Exception e) {
134             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
135                     e);
136         }
137     }
138
139     @Override
140     public void unregister(TopicListener topicListener) {
141         boolean stop;
142         synchronized (this) {
143             super.unregister(topicListener);
144             stop = this.topicListeners.isEmpty();
145         }
146
147         if (stop) {
148             this.stop();
149         }
150     }
151
152     @Override
153     public boolean start() {
154         logger.info("{}: starting", this);
155
156         synchronized (this) {
157
158             if (alive) {
159                 return true;
160             }
161
162             if (locked) {
163                 throw new IllegalStateException(this + " is locked.");
164             }
165
166             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
167
168                 try {
169                     this.init();
170                     this.alive = true;
171                     this.busPollerThread = new Thread(this);
172                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
173                     busPollerThread.start();
174                 } catch (Exception e) {
175                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
176                     throw new IllegalStateException(e);
177                 }
178             }
179         }
180
181         return this.alive;
182     }
183
184     @Override
185     public boolean stop() {
186         logger.info("{}: stopping", this);
187
188         synchronized (this) {
189             BusConsumer consumerCopy = this.consumer;
190
191             this.alive = false;
192             this.consumer = null;
193
194             if (consumerCopy != null) {
195                 try {
196                     consumerCopy.close();
197                 } catch (Exception e) {
198                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
199                 }
200             }
201         }
202
203         Thread.yield();
204
205         return true;
206     }
207
208     /**
209      * Run thread method for the Bus Reader.
210      */
211     @Override
212     public void run() {
213         while (this.alive) {
214             try {
215                 for (String event : this.consumer.fetch()) {
216                     synchronized (this) {
217                         this.recentEvents.add(event);
218                     }
219
220                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
221                             System.lineSeparator(), event);
222
223                     broadcast(event);
224
225                     if (!this.alive) {
226                         break;
227                     }
228                 }
229             } catch (Exception e) {
230                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
231             }
232         }
233
234         logger.info("{}: exiting thread", this);
235     }
236
237     /**
238      * {@inheritDoc}
239      */
240     @Override
241     public boolean offer(String event) {
242         if (!this.alive) {
243             throw new IllegalStateException(this + " is not alive.");
244         }
245
246         synchronized (this) {
247             this.recentEvents.add(event);
248         }
249
250         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
251
252
253         return broadcast(event);
254     }
255
256
257     @Override
258     public void setFilter(String filter) {
259         if (consumer instanceof FilterableBusConsumer) {
260             ((FilterableBusConsumer) consumer).setFilter(filter);
261
262         } else {
263             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
264         }
265     }
266
267     @Override
268     public String toString() {
269         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
270                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
271                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
272                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
273     }
274
275     /**
276      * {@inheritDoc}
277      */
278     @Override
279     public String getConsumerGroup() {
280         return consumerGroup;
281     }
282
283     /**
284      * {@inheritDoc}
285      */
286     @Override
287     public String getConsumerInstance() {
288         return consumerInstance;
289     }
290
291     /**
292      * {@inheritDoc}
293      */
294     @Override
295     public void shutdown() {
296         this.stop();
297         this.topicListeners.clear();
298     }
299
300     /**
301      * {@inheritDoc}
302      */
303     @Override
304     public int getFetchTimeout() {
305         return fetchTimeout;
306     }
307
308     /**
309      * {@inheritDoc}
310      */
311     @Override
312     public int getFetchLimit() {
313         return fetchLimit;
314     }
315
316 }