d59c63f5ec486172c4d72aedc23b6002ecc80ccf
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.UUID;
26
27 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
30 import org.onap.policy.common.utils.network.NetworkUtil;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * This topic source implementation specializes in reading messages over a bus topic source and
36  * notifying its listeners
37  */
38 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
39         implements Runnable, BusTopicSource, FilterableTopicSource {
40
41     /**
42      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
43      * that in a single file in a concise format.
44      */
45     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
46     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
47
48     /**
49      * Bus consumer group
50      */
51     protected final String consumerGroup;
52
53     /**
54      * Bus consumer instance
55      */
56     protected final String consumerInstance;
57
58     /**
59      * Bus fetch timeout
60      */
61     protected final int fetchTimeout;
62
63     /**
64      * Bus fetch limit
65      */
66     protected final int fetchLimit;
67
68     /**
69      * Message Bus Consumer
70      */
71     protected BusConsumer consumer;
72
73     /**
74      * Independent thread reading message over my topic
75      */
76     protected Thread busPollerThread;
77
78
79     /**
80      * 
81      * @param servers Bus servers
82      * @param topic Bus Topic to be monitored
83      * @param apiKey Bus API Key (optional)
84      * @param apiSecret Bus API Secret (optional)
85      * @param consumerGroup Bus Reader Consumer Group
86      * @param consumerInstance Bus Reader Instance
87      * @param fetchTimeout Bus fetch timeout
88      * @param fetchLimit Bus fetch limit
89      * @param useHttps does the bus use https
90      * @param allowSelfSignedCerts are self-signed certificates allowed
91      * @throws IllegalArgumentException An invalid parameter passed in
92      */
93     public SingleThreadedBusTopicSource(List<String> servers, String topic, String apiKey, String apiSecret,
94             String consumerGroup, String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps,
95             boolean allowSelfSignedCerts) {
96
97         super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
98
99         if (consumerGroup == null || consumerGroup.isEmpty()) {
100             this.consumerGroup = UUID.randomUUID().toString();
101         } else {
102             this.consumerGroup = consumerGroup;
103         }
104
105         if (consumerInstance == null || consumerInstance.isEmpty()) {
106             this.consumerInstance = NetworkUtil.getHostname();
107         } else {
108             this.consumerInstance = consumerInstance;
109         }
110
111         if (fetchTimeout <= 0) {
112             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
113         } else {
114             this.fetchTimeout = fetchTimeout;
115         }
116
117         if (fetchLimit <= 0) {
118             this.fetchLimit = NO_LIMIT_FETCH;
119         } else {
120             this.fetchLimit = fetchLimit;
121         }
122
123     }
124
125     /**
126      * Initialize the Bus client
127      */
128     public abstract void init() throws MalformedURLException;
129
130     @Override
131     public void register(TopicListener topicListener) {
132
133         super.register(topicListener);
134
135         try {
136             if (!alive && !locked) {
137                 this.start();
138             } else {
139                 logger.info("{}: register: start not attempted", this);
140             }
141         } catch (Exception e) {
142             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
143                     e);
144         }
145     }
146
147     @Override
148     public void unregister(TopicListener topicListener) {
149         boolean stop;
150         synchronized (this) {
151             super.unregister(topicListener);
152             stop = this.topicListeners.isEmpty();
153         }
154
155         if (stop) {
156             this.stop();
157         }
158     }
159
160     @Override
161     public boolean start() {
162         logger.info("{}: starting", this);
163
164         synchronized (this) {
165
166             if (alive) {
167                 return true;
168             }
169
170             if (locked) {
171                 throw new IllegalStateException(this + " is locked.");
172             }
173
174             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
175
176                 try {
177                     this.init();
178                     this.alive = true;
179                     this.busPollerThread = new Thread(this);
180                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
181                     busPollerThread.start();
182                 } catch (Exception e) {
183                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
184                     throw new IllegalStateException(e);
185                 }
186             }
187         }
188
189         return this.alive;
190     }
191
192     @Override
193     public boolean stop() {
194         logger.info("{}: stopping", this);
195
196         synchronized (this) {
197             BusConsumer consumerCopy = this.consumer;
198
199             this.alive = false;
200             this.consumer = null;
201
202             if (consumerCopy != null) {
203                 try {
204                     consumerCopy.close();
205                 } catch (Exception e) {
206                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
207                 }
208             }
209         }
210
211         Thread.yield();
212
213         return true;
214     }
215
216     /**
217      * Run thread method for the Bus Reader
218      */
219     @Override
220     public void run() {
221         while (this.alive) {
222             try {
223                 for (String event : this.consumer.fetch()) {
224                     synchronized (this) {
225                         this.recentEvents.add(event);
226                     }
227
228                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
229                             System.lineSeparator(), event);
230
231                     broadcast(event);
232
233                     if (!this.alive) {
234                         break;
235                     }
236                 }
237             } catch (Exception e) {
238                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
239             }
240         }
241
242         logger.info("{}: exiting thread", this);
243     }
244
245     /**
246      * {@inheritDoc}
247      */
248     @Override
249     public boolean offer(String event) {
250         if (!this.alive) {
251             throw new IllegalStateException(this + " is not alive.");
252         }
253
254         synchronized (this) {
255             this.recentEvents.add(event);
256         }
257
258         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
259
260
261         return broadcast(event);
262     }
263
264
265     @Override
266     public void setFilter(String filter) {
267         if (consumer instanceof FilterableBusConsumer) {
268             ((FilterableBusConsumer) consumer).setFilter(filter);
269
270         } else {
271             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
272         }
273     }
274
275     @Override
276     public String toString() {
277         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
278                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
279                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
280                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
281     }
282
283     /**
284      * {@inheritDoc}
285      */
286     @Override
287     public String getConsumerGroup() {
288         return consumerGroup;
289     }
290
291     /**
292      * {@inheritDoc}
293      */
294     @Override
295     public String getConsumerInstance() {
296         return consumerInstance;
297     }
298
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     public void shutdown() {
304         this.stop();
305         this.topicListeners.clear();
306     }
307
308     /**
309      * {@inheritDoc}
310      */
311     @Override
312     public int getFetchTimeout() {
313         return fetchTimeout;
314     }
315
316     /**
317      * {@inheritDoc}
318      */
319     @Override
320     public int getFetchLimit() {
321         return fetchLimit;
322     }
323
324 }