e4b335c36f9b600ec391027574e45f3a35b5c4ad
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This topic source implementation specializes in reading messages over a bus topic source and
37  * notifying its listeners.
38  */
39 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
40         implements Runnable, BusTopicSource, FilterableTopicSource {
41
42     /**
43      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
44      * that in a single file in a concise format.
45      */
46     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48
49     /**
50      * Bus consumer group.
51      */
52     protected final String consumerGroup;
53
54     /**
55      * Bus consumer instance.
56      */
57     protected final String consumerInstance;
58
59     /**
60      * Bus fetch timeout.
61      */
62     protected final int fetchTimeout;
63
64     /**
65      * Bus fetch limit.
66      */
67     protected final int fetchLimit;
68
69     /**
70      * Message Bus Consumer.
71      */
72     protected BusConsumer consumer;
73
74     /**
75      * Independent thread reading message over my topic.
76      */
77     protected Thread busPollerThread;
78
79
80     /**
81      * 
82      *
83      * @param busTopicParams@throws IllegalArgumentException An invalid parameter passed in
84      */
85     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
86
87         super(busTopicParams);
88
89         if (busTopicParams.isConsumerGroupNullOrEmpty()) {
90             this.consumerGroup = UUID.randomUUID().toString();
91         } else {
92             this.consumerGroup = busTopicParams.getConsumerGroup();
93         }
94
95         if (busTopicParams.isConsumerInstanceNullOrEmpty()) {
96             this.consumerInstance = NetworkUtil.getHostname();
97         } else {
98             this.consumerInstance = busTopicParams.getConsumerInstance();
99         }
100
101         if (busTopicParams.getFetchTimeout() <= 0) {
102             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
103         } else {
104             this.fetchTimeout = busTopicParams.getFetchTimeout();
105         }
106
107         if (busTopicParams.getFetchLimit() <= 0) {
108             this.fetchLimit = NO_LIMIT_FETCH;
109         } else {
110             this.fetchLimit = busTopicParams.getFetchLimit();
111         }
112
113     }
114
115     /**
116      * Initialize the Bus client.
117      */
118     public abstract void init() throws MalformedURLException;
119
120     @Override
121     public void register(TopicListener topicListener) {
122
123         super.register(topicListener);
124
125         try {
126             if (!alive && !locked) {
127                 this.start();
128             } else {
129                 logger.info("{}: register: start not attempted", this);
130             }
131         } catch (Exception e) {
132             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
133                     e);
134         }
135     }
136
137     @Override
138     public void unregister(TopicListener topicListener) {
139         boolean stop;
140         synchronized (this) {
141             super.unregister(topicListener);
142             stop = this.topicListeners.isEmpty();
143         }
144
145         if (stop) {
146             this.stop();
147         }
148     }
149
150     @Override
151     public boolean start() {
152         logger.info("{}: starting", this);
153
154         synchronized (this) {
155
156             if (alive) {
157                 return true;
158             }
159
160             if (locked) {
161                 throw new IllegalStateException(this + " is locked.");
162             }
163
164             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
165
166                 try {
167                     this.init();
168                     this.alive = true;
169                     this.busPollerThread = new Thread(this);
170                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
171                     busPollerThread.start();
172                 } catch (Exception e) {
173                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
174                     throw new IllegalStateException(e);
175                 }
176             }
177         }
178
179         return this.alive;
180     }
181
182     @Override
183     public boolean stop() {
184         logger.info("{}: stopping", this);
185
186         synchronized (this) {
187             BusConsumer consumerCopy = this.consumer;
188
189             this.alive = false;
190             this.consumer = null;
191
192             if (consumerCopy != null) {
193                 try {
194                     consumerCopy.close();
195                 } catch (Exception e) {
196                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
197                 }
198             }
199         }
200
201         Thread.yield();
202
203         return true;
204     }
205
206     /**
207      * Run thread method for the Bus Reader.
208      */
209     @Override
210     public void run() {
211         while (this.alive) {
212             try {
213                 for (String event : this.consumer.fetch()) {
214                     synchronized (this) {
215                         this.recentEvents.add(event);
216                     }
217
218                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
219                             System.lineSeparator(), event);
220
221                     broadcast(event);
222
223                     if (!this.alive) {
224                         break;
225                     }
226                 }
227             } catch (Exception e) {
228                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
229             }
230         }
231
232         logger.info("{}: exiting thread", this);
233     }
234
235     /**
236      * {@inheritDoc}
237      */
238     @Override
239     public boolean offer(String event) {
240         if (!this.alive) {
241             throw new IllegalStateException(this + " is not alive.");
242         }
243
244         synchronized (this) {
245             this.recentEvents.add(event);
246         }
247
248         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
249
250
251         return broadcast(event);
252     }
253
254
255     @Override
256     public void setFilter(String filter) {
257         if (consumer instanceof FilterableBusConsumer) {
258             ((FilterableBusConsumer) consumer).setFilter(filter);
259
260         } else {
261             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
262         }
263     }
264
265     @Override
266     public String toString() {
267         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
268                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
269                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
270                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
271     }
272
273     /**
274      * {@inheritDoc}
275      */
276     @Override
277     public String getConsumerGroup() {
278         return consumerGroup;
279     }
280
281     /**
282      * {@inheritDoc}
283      */
284     @Override
285     public String getConsumerInstance() {
286         return consumerInstance;
287     }
288
289     /**
290      * {@inheritDoc}
291      */
292     @Override
293     public void shutdown() {
294         this.stop();
295         this.topicListeners.clear();
296     }
297
298     /**
299      * {@inheritDoc}
300      */
301     @Override
302     public int getFetchTimeout() {
303         return fetchTimeout;
304     }
305
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     public int getFetchLimit() {
311         return fetchLimit;
312     }
313
314 }