510ddaaaf4849b3584e14c5b04d6a7c55f1929be
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.util.UUID;
27
28 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
34 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
35 import org.onap.policy.common.utils.network.NetworkUtil;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * This topic source implementation specializes in reading messages over a bus topic source and
41  * notifying its listeners.
42  */
43 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
44         implements Runnable, BusTopicSource, FilterableTopicSource {
45
46     /**
47      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
48      * that in a single file in a concise format.
49      */
50     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
51
52     /**
53      * Bus consumer group.
54      */
55     protected final String consumerGroup;
56
57     /**
58      * Bus consumer instance.
59      */
60     protected final String consumerInstance;
61
62     /**
63      * Bus fetch timeout.
64      */
65     protected final int fetchTimeout;
66
67     /**
68      * Bus fetch limit.
69      */
70     protected final int fetchLimit;
71
72     /**
73      * Message Bus Consumer.
74      */
75     protected BusConsumer consumer;
76
77     /**
78      * Independent thread reading message over my topic.
79      */
80     protected Thread busPollerThread;
81
82
83     /**
84      * Constructor.
85      *
86      * @param busTopicParams topic parameters
87      *
88      * @throws IllegalArgumentException An invalid parameter passed in
89      */
90     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
91
92         super(busTopicParams);
93
94         if (busTopicParams.isConsumerGroupInvalid()) {
95             this.consumerGroup = UUID.randomUUID().toString();
96         } else {
97             this.consumerGroup = busTopicParams.getConsumerGroup();
98         }
99
100         if (busTopicParams.isConsumerInstanceInvalid()) {
101             this.consumerInstance = NetworkUtil.getHostname();
102         } else {
103             this.consumerInstance = busTopicParams.getConsumerInstance();
104         }
105
106         if (busTopicParams.getFetchTimeout() <= 0) {
107             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
108         } else {
109             this.fetchTimeout = busTopicParams.getFetchTimeout();
110         }
111
112         if (busTopicParams.getFetchLimit() <= 0) {
113             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
114         } else {
115             this.fetchLimit = busTopicParams.getFetchLimit();
116         }
117
118     }
119
120     /**
121      * Initialize the Bus client.
122      */
123     public abstract void init() throws MalformedURLException;
124
125     @Override
126     public void register(TopicListener topicListener) {
127
128         super.register(topicListener);
129
130         try {
131             if (!alive && !locked) {
132                 this.start();
133             } else {
134                 logger.info("{}: register: start not attempted", this);
135             }
136         } catch (Exception e) {
137             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
138         }
139     }
140
141     @Override
142     public void unregister(TopicListener topicListener) {
143         boolean stop;
144         synchronized (this) {
145             super.unregister(topicListener);
146             stop = this.topicListeners.isEmpty();
147         }
148
149         if (stop) {
150             this.stop();
151         }
152     }
153
154     @Override
155     public boolean start() {
156         logger.info("{}: starting", this);
157
158         synchronized (this) {
159
160             if (alive) {
161                 return true;
162             }
163
164             if (locked) {
165                 throw new IllegalStateException(this + " is locked.");
166             }
167
168             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
169
170                 try {
171                     this.init();
172                     this.alive = true;
173                     this.busPollerThread = makePollerThread();
174                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
175                     busPollerThread.start();
176                     return true;
177                 } catch (Exception e) {
178                     throw new IllegalStateException(this + ": cannot start", e);
179                 }
180             }
181         }
182
183         return false;
184     }
185
186     /**
187      * Makes a new thread to be used for polling.
188      *
189      * @return a new Thread
190      */
191     protected Thread makePollerThread() {
192         return new Thread(this);
193     }
194
195     @Override
196     public boolean stop() {
197         logger.info("{}: stopping", this);
198
199         synchronized (this) {
200             BusConsumer consumerCopy = this.consumer;
201
202             this.alive = false;
203             this.consumer = null;
204
205             if (consumerCopy != null) {
206                 try {
207                     consumerCopy.close();
208                 } catch (Exception e) {
209                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
210                 }
211             }
212         }
213
214         Thread.yield();
215
216         return true;
217     }
218
219     /**
220      * Run thread method for the Bus Reader.
221      */
222     @Override
223     public void run() {
224         while (this.alive) {
225             try {
226                 fetchAllMessages();
227             } catch (IOException | RuntimeException e) {
228                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
229             }
230         }
231
232         logger.info("{}: exiting thread", this);
233     }
234
235     private void fetchAllMessages() throws IOException {
236         for (String event : this.consumer.fetch()) {
237             synchronized (this) {
238                 this.recentEvents.add(event);
239             }
240
241             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
242
243             broadcast(event);
244
245             if (!this.alive) {
246                 return;
247             }
248         }
249     }
250
251     @Override
252     public boolean offer(String event) {
253         if (!this.alive) {
254             throw new IllegalStateException(this + " is not alive.");
255         }
256
257         synchronized (this) {
258             this.recentEvents.add(event);
259         }
260
261         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
262
263         return broadcast(event);
264     }
265
266
267     @Override
268     public void setFilter(String filter) {
269         if (consumer instanceof FilterableBusConsumer) {
270             ((FilterableBusConsumer) consumer).setFilter(filter);
271
272         } else {
273             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
274         }
275     }
276
277     @Override
278     public String toString() {
279         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
280                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
281                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
282                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
283     }
284
285     @Override
286     public String getConsumerGroup() {
287         return consumerGroup;
288     }
289
290     @Override
291     public String getConsumerInstance() {
292         return consumerInstance;
293     }
294
295     @Override
296     public void shutdown() {
297         this.stop();
298         this.topicListeners.clear();
299     }
300
301     @Override
302     public int getFetchTimeout() {
303         return fetchTimeout;
304     }
305
306     @Override
307     public int getFetchLimit() {
308         return fetchLimit;
309     }
310
311 }