376a62d16ef2937b5b56ef986b565f530345b38d
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import java.io.IOException;
26 import java.net.MalformedURLException;
27 import java.util.UUID;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
31 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
33 import org.onap.policy.common.utils.network.NetworkUtil;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * This topic source implementation specializes in reading messages over a bus topic source and
39  * notifying its listeners.
40  */
41 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
42         implements Runnable, BusTopicSource {
43
44     /**
45      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
46      * that in a single file in a concise format.
47      */
48     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
49
50     /**
51      * Bus consumer group.
52      */
53     protected final String consumerGroup;
54
55     /**
56      * Bus consumer instance.
57      */
58     protected final String consumerInstance;
59
60     /**
61      * Bus fetch timeout.
62      */
63     protected final int fetchTimeout;
64
65     /**
66      * Bus fetch limit.
67      */
68     protected final int fetchLimit;
69
70     /**
71      * Message Bus Consumer.
72      */
73     protected BusConsumer consumer;
74
75     /**
76      * Independent thread reading message over my topic.
77      */
78     protected Thread busPollerThread;
79
80
81     /**
82      * Constructor.
83      *
84      * @param busTopicParams topic parameters
85      *
86      * @throws IllegalArgumentException An invalid parameter passed in
87      */
88     protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
89
90         super(busTopicParams);
91
92         if (busTopicParams.isConsumerGroupInvalid()) {
93             this.consumerGroup = UUID.randomUUID().toString();
94         } else {
95             this.consumerGroup = busTopicParams.getConsumerGroup();
96         }
97
98         if (busTopicParams.isConsumerInstanceInvalid()) {
99             this.consumerInstance = NetworkUtil.getHostname();
100         } else {
101             this.consumerInstance = busTopicParams.getConsumerInstance();
102         }
103
104         if (busTopicParams.getFetchTimeout() <= 0) {
105             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
106         } else {
107             this.fetchTimeout = busTopicParams.getFetchTimeout();
108         }
109
110         if (busTopicParams.getFetchLimit() <= 0) {
111             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
112         } else {
113             this.fetchLimit = busTopicParams.getFetchLimit();
114         }
115
116     }
117
118     /**
119      * Initialize the Bus client.
120      */
121     public abstract void init() throws MalformedURLException;
122
123     @Override
124     public void register(TopicListener topicListener) {
125
126         super.register(topicListener);
127
128         try {
129             if (!alive && !locked) {
130                 this.start();
131             } else {
132                 logger.info("{}: register: start not attempted", this);
133             }
134         } catch (Exception e) {
135             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
136         }
137     }
138
139     @Override
140     public void unregister(TopicListener topicListener) {
141         boolean stop;
142         synchronized (this) {
143             super.unregister(topicListener);
144             stop = this.topicListeners.isEmpty();
145         }
146
147         if (stop) {
148             this.stop();
149         }
150     }
151
152     @Override
153     public boolean start() {
154         logger.info("{}: starting", this);
155
156         synchronized (this) {
157
158             if (alive) {
159                 return true;
160             }
161
162             if (locked) {
163                 throw new IllegalStateException(this + " is locked.");
164             }
165
166             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
167
168                 try {
169                     this.init();
170                     this.alive = true;
171                     this.busPollerThread = makePollerThread();
172                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
173                     busPollerThread.start();
174                     return true;
175                 } catch (Exception e) {
176                     throw new IllegalStateException(this + ": cannot start", e);
177                 }
178             }
179         }
180
181         return false;
182     }
183
184     /**
185      * Makes a new thread to be used for polling.
186      *
187      * @return a new Thread
188      */
189     protected Thread makePollerThread() {
190         return new Thread(this);
191     }
192
193     @Override
194     public boolean stop() {
195         logger.info("{}: stopping", this);
196
197         synchronized (this) {
198             BusConsumer consumerCopy = this.consumer;
199
200             this.alive = false;
201             this.consumer = null;
202
203             if (consumerCopy != null) {
204                 try {
205                     consumerCopy.close();
206                 } catch (Exception e) {
207                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
208                 }
209             }
210         }
211
212         Thread.yield();
213
214         return true;
215     }
216
217     /**
218      * Run thread method for the Bus Reader.
219      */
220     @Override
221     public void run() {
222         while (this.alive) {
223             try {
224                 fetchAllMessages();
225             } catch (IOException | RuntimeException e) {
226                 logger.error("{}: cannot fetch", this, e);
227             }
228         }
229
230         logger.info("{}: exiting thread", this);
231     }
232
233     private void fetchAllMessages() throws IOException {
234         for (String event : this.consumer.fetch()) {
235             synchronized (this) {
236                 this.recentEvents.add(event);
237             }
238
239             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
240
241             broadcast(event);
242
243             if (!this.alive) {
244                 return;
245             }
246         }
247     }
248
249     @Override
250     public boolean offer(String event) {
251         if (!this.alive) {
252             throw new IllegalStateException(this + " is not alive.");
253         }
254
255         synchronized (this) {
256             this.recentEvents.add(event);
257         }
258
259         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
260
261         return broadcast(event);
262     }
263
264     @Override
265     public String toString() {
266         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
267                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
268                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
269                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
270     }
271
272     @Override
273     public String getConsumerGroup() {
274         return consumerGroup;
275     }
276
277     @Override
278     public String getConsumerInstance() {
279         return consumerInstance;
280     }
281
282     @Override
283     public void shutdown() {
284         this.stop();
285         this.topicListeners.clear();
286     }
287
288     @Override
289     public int getFetchTimeout() {
290         return fetchTimeout;
291     }
292
293     @Override
294     public int getFetchLimit() {
295         return fetchLimit;
296     }
297
298 }