4cc8fbb1cc4b85271a387f143c0bd5a556ee07c6
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.onap.policy.common.utils.slf4j.LoggerFactoryWrapper;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * This topic source implementation specializes in reading messages over a bus topic source and
38  * notifying its listeners.
39  */
40 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
41         implements Runnable, BusTopicSource, FilterableTopicSource {
42
43     /**
44      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
45      * that in a single file in a concise format.
46      */
47     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
48     private static final Logger netLogger = LoggerFactoryWrapper.getNetworkLogger();
49
50     /**
51      * Bus consumer group.
52      */
53     protected final String consumerGroup;
54
55     /**
56      * Bus consumer instance.
57      */
58     protected final String consumerInstance;
59
60     /**
61      * Bus fetch timeout.
62      */
63     protected final int fetchTimeout;
64
65     /**
66      * Bus fetch limit.
67      */
68     protected final int fetchLimit;
69
70     /**
71      * Message Bus Consumer.
72      */
73     protected BusConsumer consumer;
74
75     /**
76      * Independent thread reading message over my topic.
77      */
78     protected Thread busPollerThread;
79
80
81     /**
82      * Constructor.
83      *
84      * @param busTopicParams topic parameters
85      *
86      * @throws IllegalArgumentException An invalid parameter passed in
87      */
88     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
89
90         super(busTopicParams);
91
92         if (busTopicParams.isConsumerGroupInvalid()) {
93             this.consumerGroup = UUID.randomUUID().toString();
94         } else {
95             this.consumerGroup = busTopicParams.getConsumerGroup();
96         }
97
98         if (busTopicParams.isConsumerInstanceInvalid()) {
99             this.consumerInstance = NetworkUtil.getHostname();
100         } else {
101             this.consumerInstance = busTopicParams.getConsumerInstance();
102         }
103
104         if (busTopicParams.getFetchTimeout() <= 0) {
105             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
106         } else {
107             this.fetchTimeout = busTopicParams.getFetchTimeout();
108         }
109
110         if (busTopicParams.getFetchLimit() <= 0) {
111             this.fetchLimit = NO_LIMIT_FETCH;
112         } else {
113             this.fetchLimit = busTopicParams.getFetchLimit();
114         }
115
116     }
117
118     /**
119      * Initialize the Bus client.
120      */
121     public abstract void init() throws MalformedURLException;
122
123     @Override
124     public void register(TopicListener topicListener) {
125
126         super.register(topicListener);
127
128         try {
129             if (!alive && !locked) {
130                 this.start();
131             } else {
132                 logger.info("{}: register: start not attempted", this);
133             }
134         } catch (Exception e) {
135             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
136                     e);
137         }
138     }
139
140     @Override
141     public void unregister(TopicListener topicListener) {
142         boolean stop;
143         synchronized (this) {
144             super.unregister(topicListener);
145             stop = this.topicListeners.isEmpty();
146         }
147
148         if (stop) {
149             this.stop();
150         }
151     }
152
153     @Override
154     public boolean start() {
155         logger.info("{}: starting", this);
156
157         synchronized (this) {
158
159             if (alive) {
160                 return true;
161             }
162
163             if (locked) {
164                 throw new IllegalStateException(this + " is locked.");
165             }
166
167             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
168
169                 try {
170                     this.init();
171                     this.alive = true;
172                     this.busPollerThread = makePollerThread();
173                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
174                     busPollerThread.start();
175                 } catch (Exception e) {
176                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
177                     throw new IllegalStateException(e);
178                 }
179             }
180         }
181
182         return this.alive;
183     }
184
185     /**
186      * Makes a new thread to be used for polling.
187      *
188      * @return a new Thread
189      */
190     protected Thread makePollerThread() {
191         return new Thread(this);
192     }
193
194     @Override
195     public boolean stop() {
196         logger.info("{}: stopping", this);
197
198         synchronized (this) {
199             BusConsumer consumerCopy = this.consumer;
200
201             this.alive = false;
202             this.consumer = null;
203
204             if (consumerCopy != null) {
205                 try {
206                     consumerCopy.close();
207                 } catch (Exception e) {
208                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
209                 }
210             }
211         }
212
213         Thread.yield();
214
215         return true;
216     }
217
218     /**
219      * Run thread method for the Bus Reader.
220      */
221     @Override
222     public void run() {
223         while (this.alive) {
224             try {
225                 for (String event : this.consumer.fetch()) {
226                     synchronized (this) {
227                         this.recentEvents.add(event);
228                     }
229
230                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
231                             System.lineSeparator(), event);
232
233                     broadcast(event);
234
235                     if (!this.alive) {
236                         break;
237                     }
238                 }
239             } catch (Exception e) {
240                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
241             }
242         }
243
244         logger.info("{}: exiting thread", this);
245     }
246
247     @Override
248     public boolean offer(String event) {
249         if (!this.alive) {
250             throw new IllegalStateException(this + " is not alive.");
251         }
252
253         synchronized (this) {
254             this.recentEvents.add(event);
255         }
256
257         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
258
259
260         return broadcast(event);
261     }
262
263
264     @Override
265     public void setFilter(String filter) {
266         if (consumer instanceof FilterableBusConsumer) {
267             ((FilterableBusConsumer) consumer).setFilter(filter);
268
269         } else {
270             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
271         }
272     }
273
274     @Override
275     public String toString() {
276         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
277                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
278                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
279                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
280     }
281
282     @Override
283     public String getConsumerGroup() {
284         return consumerGroup;
285     }
286
287     @Override
288     public String getConsumerInstance() {
289         return consumerInstance;
290     }
291
292     @Override
293     public void shutdown() {
294         this.stop();
295         this.topicListeners.clear();
296     }
297
298     @Override
299     public int getFetchTimeout() {
300         return fetchTimeout;
301     }
302
303     @Override
304     public int getFetchLimit() {
305         return fetchLimit;
306     }
307
308 }