3b7851d67fc555f424f0036639000f7ef392d844
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import java.io.IOException;
26 import java.net.MalformedURLException;
27 import java.util.UUID;
28 import lombok.Getter;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
34 import org.onap.policy.common.utils.network.NetworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This topic source implementation specializes in reading messages over a bus topic source and
40  * notifying its listeners.
41  */
42 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
43         implements Runnable, BusTopicSource {
44
45     /**
46      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
47      * that in a single file in a concise format.
48      */
49     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
50
51     /**
52      * Bus consumer group.
53      */
54     @Getter
55     protected final String consumerGroup;
56
57     /**
58      * Bus consumer instance.
59      */
60     @Getter
61     protected final String consumerInstance;
62
63     /**
64      * Bus fetch timeout.
65      */
66     @Getter
67     protected final int fetchTimeout;
68
69     /**
70      * Bus fetch limit.
71      */
72     @Getter
73     protected final int fetchLimit;
74
75     /**
76      * Message Bus Consumer.
77      */
78     protected BusConsumer consumer;
79
80     /**
81      * Independent thread reading message over my topic.
82      */
83     protected Thread busPollerThread;
84
85
86     /**
87      * Constructor.
88      *
89      * @param busTopicParams topic parameters
90      *
91      * @throws IllegalArgumentException An invalid parameter passed in
92      */
93     protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
94
95         super(busTopicParams);
96
97         if (busTopicParams.isConsumerGroupInvalid()) {
98             this.consumerGroup = UUID.randomUUID().toString();
99         } else {
100             this.consumerGroup = busTopicParams.getConsumerGroup();
101         }
102
103         if (busTopicParams.isConsumerInstanceInvalid()) {
104             this.consumerInstance = NetworkUtil.getHostname();
105         } else {
106             this.consumerInstance = busTopicParams.getConsumerInstance();
107         }
108
109         if (busTopicParams.getFetchTimeout() <= 0) {
110             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
111         } else {
112             this.fetchTimeout = busTopicParams.getFetchTimeout();
113         }
114
115         if (busTopicParams.getFetchLimit() <= 0) {
116             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
117         } else {
118             this.fetchLimit = busTopicParams.getFetchLimit();
119         }
120
121     }
122
123     /**
124      * Initialize the Bus client.
125      */
126     public abstract void init() throws MalformedURLException;
127
128     @Override
129     public void register(TopicListener topicListener) {
130
131         super.register(topicListener);
132
133         try {
134             if (!alive && !locked) {
135                 this.start();
136             } else {
137                 logger.info("{}: register: start not attempted", this);
138             }
139         } catch (Exception e) {
140             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
141         }
142     }
143
144     @Override
145     public void unregister(TopicListener topicListener) {
146         boolean stop;
147         synchronized (this) {
148             super.unregister(topicListener);
149             stop = this.topicListeners.isEmpty();
150         }
151
152         if (stop) {
153             this.stop();
154         }
155     }
156
157     @Override
158     public boolean start() {
159         logger.info("{}: starting", this);
160
161         synchronized (this) {
162
163             if (alive) {
164                 return true;
165             }
166
167             if (locked) {
168                 throw new IllegalStateException(this + " is locked.");
169             }
170
171             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
172
173                 try {
174                     this.init();
175                     this.alive = true;
176                     this.busPollerThread = makePollerThread();
177                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
178                     busPollerThread.start();
179                     return true;
180                 } catch (Exception e) {
181                     throw new IllegalStateException(this + ": cannot start", e);
182                 }
183             }
184         }
185
186         return false;
187     }
188
189     /**
190      * Makes a new thread to be used for polling.
191      *
192      * @return a new Thread
193      */
194     protected Thread makePollerThread() {
195         return new Thread(this);
196     }
197
198     @Override
199     public boolean stop() {
200         logger.info("{}: stopping", this);
201
202         synchronized (this) {
203             BusConsumer consumerCopy = this.consumer;
204
205             this.alive = false;
206             this.consumer = null;
207
208             if (consumerCopy != null) {
209                 try {
210                     consumerCopy.close();
211                 } catch (Exception e) {
212                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
213                 }
214             }
215         }
216
217         Thread.yield();
218
219         return true;
220     }
221
222     /**
223      * Run thread method for the Bus Reader.
224      */
225     @Override
226     public void run() {
227         while (this.alive) {
228             try {
229                 fetchAllMessages();
230             } catch (IOException | RuntimeException e) {
231                 logger.error("{}: cannot fetch", this, e);
232             }
233         }
234
235         logger.info("{}: exiting thread", this);
236     }
237
238     private void fetchAllMessages() throws IOException {
239         for (String event : this.consumer.fetch()) {
240             synchronized (this) {
241                 this.recentEvents.add(event);
242             }
243
244             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
245
246             broadcast(event);
247
248             if (!this.alive) {
249                 return;
250             }
251         }
252     }
253
254     @Override
255     public boolean offer(String event) {
256         if (!this.alive) {
257             throw new IllegalStateException(this + " is not alive.");
258         }
259
260         synchronized (this) {
261             this.recentEvents.add(event);
262         }
263
264         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
265
266         return broadcast(event);
267     }
268
269     @Override
270     public String toString() {
271         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
272                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
273                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
274                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
275     }
276
277     @Override
278     public void shutdown() {
279         this.stop();
280         this.topicListeners.clear();
281     }
282 }