83f3760b1590962571475fe3ee494d4fda45444f
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.onap.policy.common.utils.slf4j.LoggerFactoryWrapper;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * This topic source implementation specializes in reading messages over a bus topic source and
38  * notifying its listeners.
39  */
40 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
41         implements Runnable, BusTopicSource, FilterableTopicSource {
42
43     /**
44      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
45      * that in a single file in a concise format.
46      */
47     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
48     private static final Logger netLogger = LoggerFactoryWrapper.getNetworkLogger();
49
50     /**
51      * Bus consumer group.
52      */
53     protected final String consumerGroup;
54
55     /**
56      * Bus consumer instance.
57      */
58     protected final String consumerInstance;
59
60     /**
61      * Bus fetch timeout.
62      */
63     protected final int fetchTimeout;
64
65     /**
66      * Bus fetch limit.
67      */
68     protected final int fetchLimit;
69
70     /**
71      * Message Bus Consumer.
72      */
73     protected BusConsumer consumer;
74
75     /**
76      * Independent thread reading message over my topic.
77      */
78     protected Thread busPollerThread;
79
80
81     /**
82      * Constructor.
83      *
84      * @param busTopicParams topic parameters
85      *
86      * @throws IllegalArgumentException An invalid parameter passed in
87      */
88     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
89
90         super(busTopicParams);
91
92         if (busTopicParams.isConsumerGroupInvalid()) {
93             this.consumerGroup = UUID.randomUUID().toString();
94         } else {
95             this.consumerGroup = busTopicParams.getConsumerGroup();
96         }
97
98         if (busTopicParams.isConsumerInstanceInvalid()) {
99             this.consumerInstance = NetworkUtil.getHostname();
100         } else {
101             this.consumerInstance = busTopicParams.getConsumerInstance();
102         }
103
104         if (busTopicParams.getFetchTimeout() <= 0) {
105             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
106         } else {
107             this.fetchTimeout = busTopicParams.getFetchTimeout();
108         }
109
110         if (busTopicParams.getFetchLimit() <= 0) {
111             this.fetchLimit = NO_LIMIT_FETCH;
112         } else {
113             this.fetchLimit = busTopicParams.getFetchLimit();
114         }
115
116     }
117
118     /**
119      * Initialize the Bus client.
120      */
121     public abstract void init() throws MalformedURLException;
122
123     @Override
124     public void register(TopicListener topicListener) {
125
126         super.register(topicListener);
127
128         try {
129             if (!alive && !locked) {
130                 this.start();
131             } else {
132                 logger.info("{}: register: start not attempted", this);
133             }
134         } catch (Exception e) {
135             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
136                     e);
137         }
138     }
139
140     @Override
141     public void unregister(TopicListener topicListener) {
142         boolean stop;
143         synchronized (this) {
144             super.unregister(topicListener);
145             stop = this.topicListeners.isEmpty();
146         }
147
148         if (stop) {
149             this.stop();
150         }
151     }
152
153     @Override
154     public boolean start() {
155         logger.info("{}: starting", this);
156
157         synchronized (this) {
158
159             if (alive) {
160                 return true;
161             }
162
163             if (locked) {
164                 throw new IllegalStateException(this + " is locked.");
165             }
166
167             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
168
169                 try {
170                     this.init();
171                     this.alive = true;
172                     this.busPollerThread = makePollerThread();
173                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
174                     busPollerThread.start();
175                     return true;
176                 } catch (Exception e) {
177                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
178                     throw new IllegalStateException(e);
179                 }
180             }
181         }
182
183         return false;
184     }
185
186     /**
187      * Makes a new thread to be used for polling.
188      *
189      * @return a new Thread
190      */
191     protected Thread makePollerThread() {
192         return new Thread(this);
193     }
194
195     @Override
196     public boolean stop() {
197         logger.info("{}: stopping", this);
198
199         synchronized (this) {
200             BusConsumer consumerCopy = this.consumer;
201
202             this.alive = false;
203             this.consumer = null;
204
205             if (consumerCopy != null) {
206                 try {
207                     consumerCopy.close();
208                 } catch (Exception e) {
209                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
210                 }
211             }
212         }
213
214         Thread.yield();
215
216         return true;
217     }
218
219     /**
220      * Run thread method for the Bus Reader.
221      */
222     @Override
223     public void run() {
224         while (this.alive) {
225             try {
226                 for (String event : this.consumer.fetch()) {
227                     synchronized (this) {
228                         this.recentEvents.add(event);
229                     }
230
231                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
232                             System.lineSeparator(), event);
233
234                     broadcast(event);
235
236                     if (!this.alive) {
237                         break;
238                     }
239                 }
240             } catch (Exception e) {
241                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
242             }
243         }
244
245         logger.info("{}: exiting thread", this);
246     }
247
248     @Override
249     public boolean offer(String event) {
250         if (!this.alive) {
251             throw new IllegalStateException(this + " is not alive.");
252         }
253
254         synchronized (this) {
255             this.recentEvents.add(event);
256         }
257
258         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
259
260
261         return broadcast(event);
262     }
263
264
265     @Override
266     public void setFilter(String filter) {
267         if (consumer instanceof FilterableBusConsumer) {
268             ((FilterableBusConsumer) consumer).setFilter(filter);
269
270         } else {
271             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
272         }
273     }
274
275     @Override
276     public String toString() {
277         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
278                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
279                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
280                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
281     }
282
283     @Override
284     public String getConsumerGroup() {
285         return consumerGroup;
286     }
287
288     @Override
289     public String getConsumerInstance() {
290         return consumerInstance;
291     }
292
293     @Override
294     public void shutdown() {
295         this.stop();
296         this.topicListeners.clear();
297     }
298
299     @Override
300     public int getFetchTimeout() {
301         return fetchTimeout;
302     }
303
304     @Override
305     public int getFetchLimit() {
306         return fetchLimit;
307     }
308
309 }