164f2b16bf83c98d6a8b47c3e04904330df78cdc
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.util.UUID;
27
28 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
34 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
35 import org.onap.policy.common.utils.network.NetworkUtil;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * This topic source implementation specializes in reading messages over a bus topic source and
41  * notifying its listeners.
42  */
43 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
44         implements Runnable, BusTopicSource, FilterableTopicSource {
45
46     /**
47      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
48      * that in a single file in a concise format.
49      */
50     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
51
52     /**
53      * Bus consumer group.
54      */
55     protected final String consumerGroup;
56
57     /**
58      * Bus consumer instance.
59      */
60     protected final String consumerInstance;
61
62     /**
63      * Bus fetch timeout.
64      */
65     protected final int fetchTimeout;
66
67     /**
68      * Bus fetch limit.
69      */
70     protected final int fetchLimit;
71
72     /**
73      * Message Bus Consumer.
74      */
75     protected BusConsumer consumer;
76
77     /**
78      * Independent thread reading message over my topic.
79      */
80     protected Thread busPollerThread;
81
82
83     /**
84      * Constructor.
85      *
86      * @param busTopicParams topic parameters
87      *
88      * @throws IllegalArgumentException An invalid parameter passed in
89      */
90     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
91
92         super(busTopicParams);
93
94         if (busTopicParams.isConsumerGroupInvalid()) {
95             this.consumerGroup = UUID.randomUUID().toString();
96         } else {
97             this.consumerGroup = busTopicParams.getConsumerGroup();
98         }
99
100         if (busTopicParams.isConsumerInstanceInvalid()) {
101             this.consumerInstance = NetworkUtil.getHostname();
102         } else {
103             this.consumerInstance = busTopicParams.getConsumerInstance();
104         }
105
106         if (busTopicParams.getFetchTimeout() <= 0) {
107             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
108         } else {
109             this.fetchTimeout = busTopicParams.getFetchTimeout();
110         }
111
112         if (busTopicParams.getFetchLimit() <= 0) {
113             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
114         } else {
115             this.fetchLimit = busTopicParams.getFetchLimit();
116         }
117
118     }
119
120     /**
121      * Initialize the Bus client.
122      */
123     public abstract void init() throws MalformedURLException;
124
125     @Override
126     public void register(TopicListener topicListener) {
127
128         super.register(topicListener);
129
130         try {
131             if (!alive && !locked) {
132                 this.start();
133             } else {
134                 logger.info("{}: register: start not attempted", this);
135             }
136         } catch (Exception e) {
137             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
138                     e);
139         }
140     }
141
142     @Override
143     public void unregister(TopicListener topicListener) {
144         boolean stop;
145         synchronized (this) {
146             super.unregister(topicListener);
147             stop = this.topicListeners.isEmpty();
148         }
149
150         if (stop) {
151             this.stop();
152         }
153     }
154
155     @Override
156     public boolean start() {
157         logger.info("{}: starting", this);
158
159         synchronized (this) {
160
161             if (alive) {
162                 return true;
163             }
164
165             if (locked) {
166                 throw new IllegalStateException(this + " is locked.");
167             }
168
169             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
170
171                 try {
172                     this.init();
173                     this.alive = true;
174                     this.busPollerThread = makePollerThread();
175                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
176                     busPollerThread.start();
177                     return true;
178                 } catch (Exception e) {
179                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
180                     throw new IllegalStateException(e);
181                 }
182             }
183         }
184
185         return false;
186     }
187
188     /**
189      * Makes a new thread to be used for polling.
190      *
191      * @return a new Thread
192      */
193     protected Thread makePollerThread() {
194         return new Thread(this);
195     }
196
197     @Override
198     public boolean stop() {
199         logger.info("{}: stopping", this);
200
201         synchronized (this) {
202             BusConsumer consumerCopy = this.consumer;
203
204             this.alive = false;
205             this.consumer = null;
206
207             if (consumerCopy != null) {
208                 try {
209                     consumerCopy.close();
210                 } catch (Exception e) {
211                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
212                 }
213             }
214         }
215
216         Thread.yield();
217
218         return true;
219     }
220
221     /**
222      * Run thread method for the Bus Reader.
223      */
224     @Override
225     public void run() {
226         while (this.alive) {
227             try {
228                 fetchAllMessages();
229             } catch (IOException | RuntimeException e) {
230                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
231             }
232         }
233
234         logger.info("{}: exiting thread", this);
235     }
236
237     private void fetchAllMessages() throws IOException {
238         for (String event : this.consumer.fetch()) {
239             synchronized (this) {
240                 this.recentEvents.add(event);
241             }
242
243             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
244
245             broadcast(event);
246
247             if (!this.alive) {
248                 return;
249             }
250         }
251     }
252
253     @Override
254     public boolean offer(String event) {
255         if (!this.alive) {
256             throw new IllegalStateException(this + " is not alive.");
257         }
258
259         synchronized (this) {
260             this.recentEvents.add(event);
261         }
262
263         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
264
265         return broadcast(event);
266     }
267
268
269     @Override
270     public void setFilter(String filter) {
271         if (consumer instanceof FilterableBusConsumer) {
272             ((FilterableBusConsumer) consumer).setFilter(filter);
273
274         } else {
275             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
276         }
277     }
278
279     @Override
280     public String toString() {
281         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
282                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
283                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
284                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
285     }
286
287     @Override
288     public String getConsumerGroup() {
289         return consumerGroup;
290     }
291
292     @Override
293     public String getConsumerInstance() {
294         return consumerInstance;
295     }
296
297     @Override
298     public void shutdown() {
299         this.stop();
300         this.topicListeners.clear();
301     }
302
303     @Override
304     public int getFetchTimeout() {
305         return fetchTimeout;
306     }
307
308     @Override
309     public int getFetchLimit() {
310         return fetchLimit;
311     }
312
313 }