e52204f6927a2176b3a96572ee0a6af5757c92fa
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.util.UUID;
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
34 import org.onap.policy.common.utils.network.NetworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This topic source implementation specializes in reading messages over a bus topic source and
40  * notifying its listeners.
41  */
42 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
43         implements Runnable, BusTopicSource, FilterableTopicSource {
44
45     /**
46      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
47      * that in a single file in a concise format.
48      */
49     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
50
51     /**
52      * Bus consumer group.
53      */
54     protected final String consumerGroup;
55
56     /**
57      * Bus consumer instance.
58      */
59     protected final String consumerInstance;
60
61     /**
62      * Bus fetch timeout.
63      */
64     protected final int fetchTimeout;
65
66     /**
67      * Bus fetch limit.
68      */
69     protected final int fetchLimit;
70
71     /**
72      * Message Bus Consumer.
73      */
74     protected BusConsumer consumer;
75
76     /**
77      * Independent thread reading message over my topic.
78      */
79     protected Thread busPollerThread;
80
81
82     /**
83      * Constructor.
84      *
85      * @param busTopicParams topic parameters
86      *
87      * @throws IllegalArgumentException An invalid parameter passed in
88      */
89     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
90
91         super(busTopicParams);
92
93         if (busTopicParams.isConsumerGroupInvalid()) {
94             this.consumerGroup = UUID.randomUUID().toString();
95         } else {
96             this.consumerGroup = busTopicParams.getConsumerGroup();
97         }
98
99         if (busTopicParams.isConsumerInstanceInvalid()) {
100             this.consumerInstance = NetworkUtil.getHostname();
101         } else {
102             this.consumerInstance = busTopicParams.getConsumerInstance();
103         }
104
105         if (busTopicParams.getFetchTimeout() <= 0) {
106             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
107         } else {
108             this.fetchTimeout = busTopicParams.getFetchTimeout();
109         }
110
111         if (busTopicParams.getFetchLimit() <= 0) {
112             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
113         } else {
114             this.fetchLimit = busTopicParams.getFetchLimit();
115         }
116
117     }
118
119     /**
120      * Initialize the Bus client.
121      */
122     public abstract void init() throws MalformedURLException;
123
124     @Override
125     public void register(TopicListener topicListener) {
126
127         super.register(topicListener);
128
129         try {
130             if (!alive && !locked) {
131                 this.start();
132             } else {
133                 logger.info("{}: register: start not attempted", this);
134             }
135         } catch (Exception e) {
136             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
137         }
138     }
139
140     @Override
141     public void unregister(TopicListener topicListener) {
142         boolean stop;
143         synchronized (this) {
144             super.unregister(topicListener);
145             stop = this.topicListeners.isEmpty();
146         }
147
148         if (stop) {
149             this.stop();
150         }
151     }
152
153     @Override
154     public boolean start() {
155         logger.info("{}: starting", this);
156
157         synchronized (this) {
158
159             if (alive) {
160                 return true;
161             }
162
163             if (locked) {
164                 throw new IllegalStateException(this + " is locked.");
165             }
166
167             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
168
169                 try {
170                     this.init();
171                     this.alive = true;
172                     this.busPollerThread = makePollerThread();
173                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
174                     busPollerThread.start();
175                     return true;
176                 } catch (Exception e) {
177                     throw new IllegalStateException(this + ": cannot start", e);
178                 }
179             }
180         }
181
182         return false;
183     }
184
185     /**
186      * Makes a new thread to be used for polling.
187      *
188      * @return a new Thread
189      */
190     protected Thread makePollerThread() {
191         return new Thread(this);
192     }
193
194     @Override
195     public boolean stop() {
196         logger.info("{}: stopping", this);
197
198         synchronized (this) {
199             BusConsumer consumerCopy = this.consumer;
200
201             this.alive = false;
202             this.consumer = null;
203
204             if (consumerCopy != null) {
205                 try {
206                     consumerCopy.close();
207                 } catch (Exception e) {
208                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
209                 }
210             }
211         }
212
213         Thread.yield();
214
215         return true;
216     }
217
218     /**
219      * Run thread method for the Bus Reader.
220      */
221     @Override
222     public void run() {
223         while (this.alive) {
224             try {
225                 fetchAllMessages();
226             } catch (IOException | RuntimeException e) {
227                 logger.error("{}: cannot fetch", this, e);
228             }
229         }
230
231         logger.info("{}: exiting thread", this);
232     }
233
234     private void fetchAllMessages() throws IOException {
235         for (String event : this.consumer.fetch()) {
236             synchronized (this) {
237                 this.recentEvents.add(event);
238             }
239
240             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
241
242             broadcast(event);
243
244             if (!this.alive) {
245                 return;
246             }
247         }
248     }
249
250     @Override
251     public boolean offer(String event) {
252         if (!this.alive) {
253             throw new IllegalStateException(this + " is not alive.");
254         }
255
256         synchronized (this) {
257             this.recentEvents.add(event);
258         }
259
260         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
261
262         return broadcast(event);
263     }
264
265
266     @Override
267     public void setFilter(String filter) {
268         if (consumer instanceof FilterableBusConsumer) {
269             ((FilterableBusConsumer) consumer).setFilter(filter);
270
271         } else {
272             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
273         }
274     }
275
276     @Override
277     public String toString() {
278         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
279                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
280                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
281                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
282     }
283
284     @Override
285     public String getConsumerGroup() {
286         return consumerGroup;
287     }
288
289     @Override
290     public String getConsumerInstance() {
291         return consumerInstance;
292     }
293
294     @Override
295     public void shutdown() {
296         this.stop();
297         this.topicListeners.clear();
298     }
299
300     @Override
301     public int getFetchTimeout() {
302         return fetchTimeout;
303     }
304
305     @Override
306     public int getFetchLimit() {
307         return fetchLimit;
308     }
309
310 }