0953465b53c846426ac6cd5a6fd4ba4f2bd8f45f
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.util.UUID;
27
28 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
34 import org.onap.policy.common.utils.network.NetworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This topic source implementation specializes in reading messages over a bus topic source and
40  * notifying its listeners.
41  */
42 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
43         implements Runnable, BusTopicSource, FilterableTopicSource {
44
45     /**
46      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
47      * that in a single file in a concise format.
48      */
49     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
50
51     /**
52      * Bus consumer group.
53      */
54     protected final String consumerGroup;
55
56     /**
57      * Bus consumer instance.
58      */
59     protected final String consumerInstance;
60
61     /**
62      * Bus fetch timeout.
63      */
64     protected final int fetchTimeout;
65
66     /**
67      * Bus fetch limit.
68      */
69     protected final int fetchLimit;
70
71     /**
72      * Message Bus Consumer.
73      */
74     protected BusConsumer consumer;
75
76     /**
77      * Independent thread reading message over my topic.
78      */
79     protected Thread busPollerThread;
80
81
82     /**
83      * Constructor.
84      *
85      * @param busTopicParams topic parameters
86      *
87      * @throws IllegalArgumentException An invalid parameter passed in
88      */
89     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
90
91         super(busTopicParams);
92
93         if (busTopicParams.isConsumerGroupInvalid()) {
94             this.consumerGroup = UUID.randomUUID().toString();
95         } else {
96             this.consumerGroup = busTopicParams.getConsumerGroup();
97         }
98
99         if (busTopicParams.isConsumerInstanceInvalid()) {
100             this.consumerInstance = NetworkUtil.getHostname();
101         } else {
102             this.consumerInstance = busTopicParams.getConsumerInstance();
103         }
104
105         if (busTopicParams.getFetchTimeout() <= 0) {
106             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
107         } else {
108             this.fetchTimeout = busTopicParams.getFetchTimeout();
109         }
110
111         if (busTopicParams.getFetchLimit() <= 0) {
112             this.fetchLimit = NO_LIMIT_FETCH;
113         } else {
114             this.fetchLimit = busTopicParams.getFetchLimit();
115         }
116
117     }
118
119     /**
120      * Initialize the Bus client.
121      */
122     public abstract void init() throws MalformedURLException;
123
124     @Override
125     public void register(TopicListener topicListener) {
126
127         super.register(topicListener);
128
129         try {
130             if (!alive && !locked) {
131                 this.start();
132             } else {
133                 logger.info("{}: register: start not attempted", this);
134             }
135         } catch (Exception e) {
136             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
137                     e);
138         }
139     }
140
141     @Override
142     public void unregister(TopicListener topicListener) {
143         boolean stop;
144         synchronized (this) {
145             super.unregister(topicListener);
146             stop = this.topicListeners.isEmpty();
147         }
148
149         if (stop) {
150             this.stop();
151         }
152     }
153
154     @Override
155     public boolean start() {
156         logger.info("{}: starting", this);
157
158         synchronized (this) {
159
160             if (alive) {
161                 return true;
162             }
163
164             if (locked) {
165                 throw new IllegalStateException(this + " is locked.");
166             }
167
168             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
169
170                 try {
171                     this.init();
172                     this.alive = true;
173                     this.busPollerThread = makePollerThread();
174                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
175                     busPollerThread.start();
176                     return true;
177                 } catch (Exception e) {
178                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
179                     throw new IllegalStateException(e);
180                 }
181             }
182         }
183
184         return false;
185     }
186
187     /**
188      * Makes a new thread to be used for polling.
189      *
190      * @return a new Thread
191      */
192     protected Thread makePollerThread() {
193         return new Thread(this);
194     }
195
196     @Override
197     public boolean stop() {
198         logger.info("{}: stopping", this);
199
200         synchronized (this) {
201             BusConsumer consumerCopy = this.consumer;
202
203             this.alive = false;
204             this.consumer = null;
205
206             if (consumerCopy != null) {
207                 try {
208                     consumerCopy.close();
209                 } catch (Exception e) {
210                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
211                 }
212             }
213         }
214
215         Thread.yield();
216
217         return true;
218     }
219
220     /**
221      * Run thread method for the Bus Reader.
222      */
223     @Override
224     public void run() {
225         while (this.alive) {
226             try {
227                 fetchAllMessages();
228             } catch (Exception e) {
229                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
230             }
231         }
232
233         logger.info("{}: exiting thread", this);
234     }
235
236     private void fetchAllMessages() throws InterruptedException, IOException {
237         for (String event : this.consumer.fetch()) {
238             synchronized (this) {
239                 this.recentEvents.add(event);
240             }
241
242             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
243
244             broadcast(event);
245
246             if (!this.alive) {
247                 return;
248             }
249         }
250     }
251
252     @Override
253     public boolean offer(String event) {
254         if (!this.alive) {
255             throw new IllegalStateException(this + " is not alive.");
256         }
257
258         synchronized (this) {
259             this.recentEvents.add(event);
260         }
261
262         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
263
264         return broadcast(event);
265     }
266
267
268     @Override
269     public void setFilter(String filter) {
270         if (consumer instanceof FilterableBusConsumer) {
271             ((FilterableBusConsumer) consumer).setFilter(filter);
272
273         } else {
274             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
275         }
276     }
277
278     @Override
279     public String toString() {
280         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
281                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
282                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
283                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
284     }
285
286     @Override
287     public String getConsumerGroup() {
288         return consumerGroup;
289     }
290
291     @Override
292     public String getConsumerInstance() {
293         return consumerInstance;
294     }
295
296     @Override
297     public void shutdown() {
298         this.stop();
299         this.topicListeners.clear();
300     }
301
302     @Override
303     public int getFetchTimeout() {
304         return fetchTimeout;
305     }
306
307     @Override
308     public int getFetchLimit() {
309         return fetchLimit;
310     }
311
312 }