97ebbd500908f40c63f0a07b04abd21bc8368218
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
31 import org.onap.policy.common.utils.network.NetworkUtil;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * This topic source implementation specializes in reading messages over a bus topic source and
37  * notifying its listeners
38  */
39 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
40         implements Runnable, BusTopicSource, FilterableTopicSource {
41
42     /**
43      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
44      * that in a single file in a concise format.
45      */
46     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
48
49     /**
50      * Bus consumer group
51      */
52     protected final String consumerGroup;
53
54     /**
55      * Bus consumer instance
56      */
57     protected final String consumerInstance;
58
59     /**
60      * Bus fetch timeout
61      */
62     protected final int fetchTimeout;
63
64     /**
65      * Bus fetch limit
66      */
67     protected final int fetchLimit;
68
69     /**
70      * Message Bus Consumer
71      */
72     protected BusConsumer consumer;
73
74     /**
75      * Independent thread reading message over my topic
76      */
77     protected Thread busPollerThread;
78
79
80     /**
81      * 
82      * @param servers Bus servers
83      * @param topic Bus Topic to be monitored
84      * @param apiKey Bus API Key (optional)
85      * @param apiSecret Bus API Secret (optional)
86      * @param consumerGroup Bus Reader Consumer Group
87      * @param consumerInstance Bus Reader Instance
88      * @param fetchTimeout Bus fetch timeout
89      * @param fetchLimit Bus fetch limit
90      * @param useHttps does the bus use https
91      * @param allowSelfSignedCerts are self-signed certificates allowed
92      * @throws IllegalArgumentException An invalid parameter passed in
93      */
94     public SingleThreadedBusTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
95             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
96             boolean allowSelfSignedCerts) {
97
98         super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
99
100         if (consumerGroup == null || consumerGroup.isEmpty()) {
101             this.consumerGroup = UUID.randomUUID().toString();
102         } else {
103             this.consumerGroup = consumerGroup;
104         }
105
106         if (consumerInstance == null || consumerInstance.isEmpty()) {
107             this.consumerInstance = NetworkUtil.getHostname();
108         } else {
109             this.consumerInstance = consumerInstance;
110         }
111
112         if (fetchTimeout <= 0) {
113             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
114         } else {
115             this.fetchTimeout = fetchTimeout;
116         }
117
118         if (fetchLimit <= 0) {
119             this.fetchLimit = NO_LIMIT_FETCH;
120         } else {
121             this.fetchLimit = fetchLimit;
122         }
123
124     }
125
126     /**
127      * Initialize the Bus client
128      */
129     public abstract void init() throws MalformedURLException;
130
131     @Override
132     public void register(TopicListener topicListener) {
133
134         super.register(topicListener);
135
136         try {
137             if (!alive && !locked) {
138                 this.start();
139             } else {
140                 logger.info("{}: register: start not attempted", this);
141             }
142         } catch (Exception e) {
143             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
144                     e);
145         }
146     }
147
148     @Override
149     public void unregister(TopicListener topicListener) {
150         boolean stop;
151         synchronized (this) {
152             super.unregister(topicListener);
153             stop = this.topicListeners.isEmpty();
154         }
155
156         if (stop) {
157             this.stop();
158         }
159     }
160
161     @Override
162     public boolean start() {
163         logger.info("{}: starting", this);
164
165         synchronized (this) {
166
167             if (alive) {
168                 return true;
169             }
170
171             if (locked) {
172                 throw new IllegalStateException(this + " is locked.");
173             }
174
175             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
176
177                 try {
178                     this.init();
179                     this.alive = true;
180                     this.busPollerThread = new Thread(this);
181                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
182                     busPollerThread.start();
183                 } catch (Exception e) {
184                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
185                     throw new IllegalStateException(e);
186                 }
187             }
188         }
189
190         return this.alive;
191     }
192
193     @Override
194     public boolean stop() {
195         logger.info("{}: stopping", this);
196
197         synchronized (this) {
198             BusConsumer consumerCopy = this.consumer;
199
200             this.alive = false;
201             this.consumer = null;
202
203             if (consumerCopy != null) {
204                 try {
205                     consumerCopy.close();
206                 } catch (Exception e) {
207                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
208                 }
209             }
210         }
211
212         Thread.yield();
213
214         return true;
215     }
216
217     /**
218      * Run thread method for the Bus Reader
219      */
220     @Override
221     public void run() {
222         while (this.alive) {
223             try {
224                 for (String event : this.consumer.fetch()) {
225                     synchronized (this) {
226                         this.recentEvents.add(event);
227                     }
228
229                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
230                             System.lineSeparator(), event);
231
232                     broadcast(event);
233
234                     if (!this.alive) {
235                         break;
236                     }
237                 }
238             } catch (Exception e) {
239                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
240             }
241         }
242
243         logger.info("{}: exiting thread", this);
244     }
245
246     /**
247      * {@inheritDoc}
248      */
249     @Override
250     public boolean offer(String event) {
251         if (!this.alive) {
252             throw new IllegalStateException(this + " is not alive.");
253         }
254
255         synchronized (this) {
256             this.recentEvents.add(event);
257         }
258
259         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
260
261
262         return broadcast(event);
263     }
264
265
266     @Override
267     public void setFilter(String filter) {
268         if (consumer instanceof FilterableBusConsumer) {
269             ((FilterableBusConsumer) consumer).setFilter(filter);
270
271         } else {
272             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
273         }
274     }
275
276     @Override
277     public String toString() {
278         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
279                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
280                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
281                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
282     }
283
284     /**
285      * {@inheritDoc}
286      */
287     @Override
288     public String getConsumerGroup() {
289         return consumerGroup;
290     }
291
292     /**
293      * {@inheritDoc}
294      */
295     @Override
296     public String getConsumerInstance() {
297         return consumerInstance;
298     }
299
300     /**
301      * {@inheritDoc}
302      */
303     @Override
304     public void shutdown() {
305         this.stop();
306         this.topicListeners.clear();
307     }
308
309     /**
310      * {@inheritDoc}
311      */
312     @Override
313     public int getFetchTimeout() {
314         return fetchTimeout;
315     }
316
317     /**
318      * {@inheritDoc}
319      */
320     @Override
321     public int getFetchLimit() {
322         return fetchLimit;
323     }
324
325 }