b588d1f3af4646b8ddcd95637261b9f4c0365005
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This topic source implementation specializes in reading messages over a bus topic source and
37  * notifying its listeners.
38  */
39 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
40         implements Runnable, BusTopicSource, FilterableTopicSource {
41
42     /**
43      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
44      * that in a single file in a concise format.
45      */
46     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48
49     /**
50      * Bus consumer group.
51      */
52     protected final String consumerGroup;
53
54     /**
55      * Bus consumer instance.
56      */
57     protected final String consumerInstance;
58
59     /**
60      * Bus fetch timeout.
61      */
62     protected final int fetchTimeout;
63
64     /**
65      * Bus fetch limit.
66      */
67     protected final int fetchLimit;
68
69     /**
70      * Message Bus Consumer.
71      */
72     protected BusConsumer consumer;
73
74     /**
75      * Independent thread reading message over my topic.
76      */
77     protected Thread busPollerThread;
78
79
80     /**
81      * Constructor.
82      *
83      * @param busTopicParams topic parameters
84      * 
85      * @throws IllegalArgumentException An invalid parameter passed in
86      */
87     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
88
89         super(busTopicParams);
90
91         if (busTopicParams.isConsumerGroupInvalid()) {
92             this.consumerGroup = UUID.randomUUID().toString();
93         } else {
94             this.consumerGroup = busTopicParams.getConsumerGroup();
95         }
96
97         if (busTopicParams.isConsumerInstanceInvalid()) {
98             this.consumerInstance = NetworkUtil.getHostname();
99         } else {
100             this.consumerInstance = busTopicParams.getConsumerInstance();
101         }
102
103         if (busTopicParams.getFetchTimeout() <= 0) {
104             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
105         } else {
106             this.fetchTimeout = busTopicParams.getFetchTimeout();
107         }
108
109         if (busTopicParams.getFetchLimit() <= 0) {
110             this.fetchLimit = NO_LIMIT_FETCH;
111         } else {
112             this.fetchLimit = busTopicParams.getFetchLimit();
113         }
114
115     }
116
117     /**
118      * Initialize the Bus client.
119      */
120     public abstract void init() throws MalformedURLException;
121
122     @Override
123     public void register(TopicListener topicListener) {
124
125         super.register(topicListener);
126
127         try {
128             if (!alive && !locked) {
129                 this.start();
130             } else {
131                 logger.info("{}: register: start not attempted", this);
132             }
133         } catch (Exception e) {
134             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
135                     e);
136         }
137     }
138
139     @Override
140     public void unregister(TopicListener topicListener) {
141         boolean stop;
142         synchronized (this) {
143             super.unregister(topicListener);
144             stop = this.topicListeners.isEmpty();
145         }
146
147         if (stop) {
148             this.stop();
149         }
150     }
151
152     @Override
153     public boolean start() {
154         logger.info("{}: starting", this);
155
156         synchronized (this) {
157
158             if (alive) {
159                 return true;
160             }
161
162             if (locked) {
163                 throw new IllegalStateException(this + " is locked.");
164             }
165
166             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
167
168                 try {
169                     this.init();
170                     this.alive = true;
171                     this.busPollerThread = makePollerThread();
172                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
173                     busPollerThread.start();
174                 } catch (Exception e) {
175                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
176                     throw new IllegalStateException(e);
177                 }
178             }
179         }
180
181         return this.alive;
182     }
183
184     /**
185      * Makes a new thread to be used for polling.
186      * 
187      * @return a new Thread
188      */
189     protected Thread makePollerThread() {
190         return new Thread(this);
191     }
192
193     @Override
194     public boolean stop() {
195         logger.info("{}: stopping", this);
196
197         synchronized (this) {
198             BusConsumer consumerCopy = this.consumer;
199
200             this.alive = false;
201             this.consumer = null;
202
203             if (consumerCopy != null) {
204                 try {
205                     consumerCopy.close();
206                 } catch (Exception e) {
207                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
208                 }
209             }
210         }
211
212         Thread.yield();
213
214         return true;
215     }
216
217     /**
218      * Run thread method for the Bus Reader.
219      */
220     @Override
221     public void run() {
222         while (this.alive) {
223             try {
224                 for (String event : this.consumer.fetch()) {
225                     synchronized (this) {
226                         this.recentEvents.add(event);
227                     }
228
229                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
230                             System.lineSeparator(), event);
231
232                     broadcast(event);
233
234                     if (!this.alive) {
235                         break;
236                     }
237                 }
238             } catch (Exception e) {
239                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
240             }
241         }
242
243         logger.info("{}: exiting thread", this);
244     }
245
246     @Override
247     public boolean offer(String event) {
248         if (!this.alive) {
249             throw new IllegalStateException(this + " is not alive.");
250         }
251
252         synchronized (this) {
253             this.recentEvents.add(event);
254         }
255
256         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
257
258
259         return broadcast(event);
260     }
261
262
263     @Override
264     public void setFilter(String filter) {
265         if (consumer instanceof FilterableBusConsumer) {
266             ((FilterableBusConsumer) consumer).setFilter(filter);
267
268         } else {
269             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
270         }
271     }
272
273     @Override
274     public String toString() {
275         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
276                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
277                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
278                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
279     }
280
281     @Override
282     public String getConsumerGroup() {
283         return consumerGroup;
284     }
285
286     @Override
287     public String getConsumerInstance() {
288         return consumerInstance;
289     }
290
291     @Override
292     public void shutdown() {
293         this.stop();
294         this.topicListeners.clear();
295     }
296
297     @Override
298     public int getFetchTimeout() {
299         return fetchTimeout;
300     }
301
302     @Override
303     public int getFetchLimit() {
304         return fetchLimit;
305     }
306
307 }