f98b481feaae66773136f022ccfabc0ce8650015
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
24
25 import java.io.IOException;
26 import java.net.MalformedURLException;
27 import java.util.UUID;
28 import lombok.Getter;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
34 import org.onap.policy.common.utils.network.NetworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * This topic source implementation specializes in reading messages over a bus topic source and
40  * notifying its listeners.
41  */
42 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
43         implements Runnable, BusTopicSource {
44
45     /**
46      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
47      * that in a single file in a concise format.
48      */
49     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
50
51     /**
52      * Bus consumer group.
53      */
54     @Getter
55     protected final String consumerGroup;
56
57     /**
58      * Bus consumer instance.
59      */
60     @Getter
61     protected final String consumerInstance;
62
63     /**
64      * Bus fetch timeout.
65      */
66     @Getter
67     protected final int fetchTimeout;
68
69     /**
70      * Bus fetch limit.
71      */
72     @Getter
73     protected final int fetchLimit;
74
75     /**
76      * Message Bus Consumer.
77      */
78     protected BusConsumer consumer;
79
80     /**
81      * Independent thread reading message over my topic.
82      */
83     protected Thread busPollerThread;
84
85
86     /**
87      * Constructor.
88      *
89      * @param busTopicParams topic parameters
90      *
91      * @throws IllegalArgumentException An invalid parameter passed in
92      */
93     protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
94
95         super(busTopicParams);
96
97         if (busTopicParams.isConsumerGroupInvalid() && busTopicParams.isConsumerInstanceInvalid()) {
98             this.consumerGroup = UUID.randomUUID().toString();
99             this.consumerInstance = NetworkUtil.getHostname();
100
101         } else if (busTopicParams.isConsumerGroupInvalid()) {
102             this.consumerGroup = UUID.randomUUID().toString();
103             this.consumerInstance = busTopicParams.getConsumerInstance();
104
105         } else if (busTopicParams.isConsumerInstanceInvalid()) {
106             this.consumerGroup = busTopicParams.getConsumerGroup();
107             this.consumerInstance = UUID.randomUUID().toString();
108
109         } else {
110             this.consumerGroup = busTopicParams.getConsumerGroup();
111             this.consumerInstance = busTopicParams.getConsumerInstance();
112         }
113
114         if (busTopicParams.getFetchTimeout() <= 0) {
115             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
116         } else {
117             this.fetchTimeout = busTopicParams.getFetchTimeout();
118         }
119
120         if (busTopicParams.getFetchLimit() <= 0) {
121             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
122         } else {
123             this.fetchLimit = busTopicParams.getFetchLimit();
124         }
125
126     }
127
128     /**
129      * Initialize the Bus client.
130      */
131     public abstract void init() throws MalformedURLException;
132
133     @Override
134     public void register(TopicListener topicListener) {
135
136         super.register(topicListener);
137
138         try {
139             if (!alive && !locked) {
140                 this.start();
141             } else {
142                 logger.info("{}: register: start not attempted", this);
143             }
144         } catch (Exception e) {
145             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
146         }
147     }
148
149     @Override
150     public void unregister(TopicListener topicListener) {
151         boolean stop;
152         synchronized (this) {
153             super.unregister(topicListener);
154             stop = this.topicListeners.isEmpty();
155         }
156
157         if (stop) {
158             this.stop();
159         }
160     }
161
162     @Override
163     public boolean start() {
164         logger.info("{}: starting", this);
165
166         synchronized (this) {
167
168             if (alive) {
169                 return true;
170             }
171
172             if (locked) {
173                 throw new IllegalStateException(this + " is locked.");
174             }
175
176             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
177
178                 try {
179                     this.init();
180                     this.alive = true;
181                     this.busPollerThread = makePollerThread();
182                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
183                     busPollerThread.start();
184                     return true;
185                 } catch (Exception e) {
186                     throw new IllegalStateException(this + ": cannot start", e);
187                 }
188             }
189         }
190
191         return false;
192     }
193
194     /**
195      * Makes a new thread to be used for polling.
196      *
197      * @return a new Thread
198      */
199     protected Thread makePollerThread() {
200         return new Thread(this);
201     }
202
203     @Override
204     public boolean stop() {
205         logger.info("{}: stopping", this);
206
207         synchronized (this) {
208             BusConsumer consumerCopy = this.consumer;
209
210             this.alive = false;
211             this.consumer = null;
212
213             if (consumerCopy != null) {
214                 try {
215                     consumerCopy.close();
216                 } catch (Exception e) {
217                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
218                 }
219             }
220         }
221
222         Thread.yield();
223
224         return true;
225     }
226
227     /**
228      * Run thread method for the Bus Reader.
229      */
230     @Override
231     public void run() {
232         while (this.alive) {
233             try {
234                 fetchAllMessages();
235             } catch (IOException | RuntimeException e) {
236                 logger.error("{}: cannot fetch", this, e);
237             }
238         }
239
240         logger.info("{}: exiting thread", this);
241     }
242
243     private void fetchAllMessages() throws IOException {
244         for (String event : this.consumer.fetch()) {
245             synchronized (this) {
246                 this.recentEvents.add(event);
247             }
248
249             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
250
251             broadcast(event);
252
253             if (!this.alive) {
254                 return;
255             }
256         }
257     }
258
259     @Override
260     public boolean offer(String event) {
261         if (!this.alive) {
262             throw new IllegalStateException(this + " is not alive.");
263         }
264
265         synchronized (this) {
266             this.recentEvents.add(event);
267         }
268
269         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
270
271         return broadcast(event);
272     }
273
274     @Override
275     public String toString() {
276         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
277                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
278                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
279                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
280     }
281
282     @Override
283     public void shutdown() {
284         this.stop();
285         this.topicListeners.clear();
286     }
287 }