6e74694814773bf98350c5c59e3acd3fe6ee3a02
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.io.IOException;
25 import java.net.MalformedURLException;
26 import java.util.UUID;
27 import org.onap.policy.common.endpoints.event.comm.TopicListener;
28 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
29 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
30 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
31 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
32 import org.onap.policy.common.utils.network.NetworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * This topic source implementation specializes in reading messages over a bus topic source and
38  * notifying its listeners.
39  */
40 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
41         implements Runnable, BusTopicSource {
42
43     /**
44      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
45      * that in a single file in a concise format.
46      */
47     private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
48
49     /**
50      * Bus consumer group.
51      */
52     protected final String consumerGroup;
53
54     /**
55      * Bus consumer instance.
56      */
57     protected final String consumerInstance;
58
59     /**
60      * Bus fetch timeout.
61      */
62     protected final int fetchTimeout;
63
64     /**
65      * Bus fetch limit.
66      */
67     protected final int fetchLimit;
68
69     /**
70      * Message Bus Consumer.
71      */
72     protected BusConsumer consumer;
73
74     /**
75      * Independent thread reading message over my topic.
76      */
77     protected Thread busPollerThread;
78
79
80     /**
81      * Constructor.
82      *
83      * @param busTopicParams topic parameters
84      *
85      * @throws IllegalArgumentException An invalid parameter passed in
86      */
87     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
88
89         super(busTopicParams);
90
91         if (busTopicParams.isConsumerGroupInvalid()) {
92             this.consumerGroup = UUID.randomUUID().toString();
93         } else {
94             this.consumerGroup = busTopicParams.getConsumerGroup();
95         }
96
97         if (busTopicParams.isConsumerInstanceInvalid()) {
98             this.consumerInstance = NetworkUtil.getHostname();
99         } else {
100             this.consumerInstance = busTopicParams.getConsumerInstance();
101         }
102
103         if (busTopicParams.getFetchTimeout() <= 0) {
104             this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
105         } else {
106             this.fetchTimeout = busTopicParams.getFetchTimeout();
107         }
108
109         if (busTopicParams.getFetchLimit() <= 0) {
110             this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
111         } else {
112             this.fetchLimit = busTopicParams.getFetchLimit();
113         }
114
115     }
116
117     /**
118      * Initialize the Bus client.
119      */
120     public abstract void init() throws MalformedURLException;
121
122     @Override
123     public void register(TopicListener topicListener) {
124
125         super.register(topicListener);
126
127         try {
128             if (!alive && !locked) {
129                 this.start();
130             } else {
131                 logger.info("{}: register: start not attempted", this);
132             }
133         } catch (Exception e) {
134             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
135         }
136     }
137
138     @Override
139     public void unregister(TopicListener topicListener) {
140         boolean stop;
141         synchronized (this) {
142             super.unregister(topicListener);
143             stop = this.topicListeners.isEmpty();
144         }
145
146         if (stop) {
147             this.stop();
148         }
149     }
150
151     @Override
152     public boolean start() {
153         logger.info("{}: starting", this);
154
155         synchronized (this) {
156
157             if (alive) {
158                 return true;
159             }
160
161             if (locked) {
162                 throw new IllegalStateException(this + " is locked.");
163             }
164
165             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
166
167                 try {
168                     this.init();
169                     this.alive = true;
170                     this.busPollerThread = makePollerThread();
171                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
172                     busPollerThread.start();
173                     return true;
174                 } catch (Exception e) {
175                     throw new IllegalStateException(this + ": cannot start", e);
176                 }
177             }
178         }
179
180         return false;
181     }
182
183     /**
184      * Makes a new thread to be used for polling.
185      *
186      * @return a new Thread
187      */
188     protected Thread makePollerThread() {
189         return new Thread(this);
190     }
191
192     @Override
193     public boolean stop() {
194         logger.info("{}: stopping", this);
195
196         synchronized (this) {
197             BusConsumer consumerCopy = this.consumer;
198
199             this.alive = false;
200             this.consumer = null;
201
202             if (consumerCopy != null) {
203                 try {
204                     consumerCopy.close();
205                 } catch (Exception e) {
206                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
207                 }
208             }
209         }
210
211         Thread.yield();
212
213         return true;
214     }
215
216     /**
217      * Run thread method for the Bus Reader.
218      */
219     @Override
220     public void run() {
221         while (this.alive) {
222             try {
223                 fetchAllMessages();
224             } catch (IOException | RuntimeException e) {
225                 logger.error("{}: cannot fetch", this, e);
226             }
227         }
228
229         logger.info("{}: exiting thread", this);
230     }
231
232     private void fetchAllMessages() throws IOException {
233         for (String event : this.consumer.fetch()) {
234             synchronized (this) {
235                 this.recentEvents.add(event);
236             }
237
238             NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
239
240             broadcast(event);
241
242             if (!this.alive) {
243                 return;
244             }
245         }
246     }
247
248     @Override
249     public boolean offer(String event) {
250         if (!this.alive) {
251             throw new IllegalStateException(this + " is not alive.");
252         }
253
254         synchronized (this) {
255             this.recentEvents.add(event);
256         }
257
258         NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
259
260         return broadcast(event);
261     }
262
263     @Override
264     public String toString() {
265         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
266                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
267                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
268                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
269     }
270
271     @Override
272     public String getConsumerGroup() {
273         return consumerGroup;
274     }
275
276     @Override
277     public String getConsumerInstance() {
278         return consumerInstance;
279     }
280
281     @Override
282     public void shutdown() {
283         this.stop();
284         this.topicListeners.clear();
285     }
286
287     @Override
288     public int getFetchTimeout() {
289         return fetchTimeout;
290     }
291
292     @Override
293     public int getFetchLimit() {
294         return fetchLimit;
295     }
296
297 }