74912caed6312d2ee0d86cd3c7d1bf7dffdf59bc
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.UUID;
27
28 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
32 import org.onap.policy.common.utils.network.NetworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * This topic source implementation specializes in reading messages over a bus topic source and
38  * notifying its listeners
39  */
40 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
41         implements Runnable, BusTopicSource, FilterableTopicSource {
42
43     /**
44      * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
45      * that in a single file in a concise format.
46      */
47     private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
48     private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
49
50     /**
51      * Bus consumer group
52      */
53     protected final String consumerGroup;
54
55     /**
56      * Bus consumer instance
57      */
58     protected final String consumerInstance;
59
60     /**
61      * Bus fetch timeout
62      */
63     protected final int fetchTimeout;
64
65     /**
66      * Bus fetch limit
67      */
68     protected final int fetchLimit;
69
70     /**
71      * Message Bus Consumer
72      */
73     protected BusConsumer consumer;
74
75     /**
76      * Independent thread reading message over my topic
77      */
78     protected Thread busPollerThread;
79
80
81     /**
82      * 
83      *
84      * @param busTopicParams@throws IllegalArgumentException An invalid parameter passed in
85      */
86     public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
87
88         super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts());
89
90         if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) {
91             this.consumerGroup = UUID.randomUUID().toString();
92         } else {
93             this.consumerGroup = busTopicParams.getConsumerGroup();
94         }
95
96         if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) {
97             this.consumerInstance = NetworkUtil.getHostname();
98         } else {
99             this.consumerInstance = busTopicParams.getConsumerInstance();
100         }
101
102         if (busTopicParams.getFetchTimeout() <= 0) {
103             this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
104         } else {
105             this.fetchTimeout = busTopicParams.getFetchTimeout();
106         }
107
108         if (busTopicParams.getFetchLimit() <= 0) {
109             this.fetchLimit = NO_LIMIT_FETCH;
110         } else {
111             this.fetchLimit = busTopicParams.getFetchLimit();
112         }
113
114     }
115
116     /**
117      * Initialize the Bus client
118      */
119     public abstract void init() throws MalformedURLException;
120
121     @Override
122     public void register(TopicListener topicListener) {
123
124         super.register(topicListener);
125
126         try {
127             if (!alive && !locked) {
128                 this.start();
129             } else {
130                 logger.info("{}: register: start not attempted", this);
131             }
132         } catch (Exception e) {
133             logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
134                     e);
135         }
136     }
137
138     @Override
139     public void unregister(TopicListener topicListener) {
140         boolean stop;
141         synchronized (this) {
142             super.unregister(topicListener);
143             stop = this.topicListeners.isEmpty();
144         }
145
146         if (stop) {
147             this.stop();
148         }
149     }
150
151     @Override
152     public boolean start() {
153         logger.info("{}: starting", this);
154
155         synchronized (this) {
156
157             if (alive) {
158                 return true;
159             }
160
161             if (locked) {
162                 throw new IllegalStateException(this + " is locked.");
163             }
164
165             if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
166
167                 try {
168                     this.init();
169                     this.alive = true;
170                     this.busPollerThread = new Thread(this);
171                     this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
172                     busPollerThread.start();
173                 } catch (Exception e) {
174                     logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
175                     throw new IllegalStateException(e);
176                 }
177             }
178         }
179
180         return this.alive;
181     }
182
183     @Override
184     public boolean stop() {
185         logger.info("{}: stopping", this);
186
187         synchronized (this) {
188             BusConsumer consumerCopy = this.consumer;
189
190             this.alive = false;
191             this.consumer = null;
192
193             if (consumerCopy != null) {
194                 try {
195                     consumerCopy.close();
196                 } catch (Exception e) {
197                     logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
198                 }
199             }
200         }
201
202         Thread.yield();
203
204         return true;
205     }
206
207     /**
208      * Run thread method for the Bus Reader
209      */
210     @Override
211     public void run() {
212         while (this.alive) {
213             try {
214                 for (String event : this.consumer.fetch()) {
215                     synchronized (this) {
216                         this.recentEvents.add(event);
217                     }
218
219                     netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
220                             System.lineSeparator(), event);
221
222                     broadcast(event);
223
224                     if (!this.alive) {
225                         break;
226                     }
227                 }
228             } catch (Exception e) {
229                 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
230             }
231         }
232
233         logger.info("{}: exiting thread", this);
234     }
235
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public boolean offer(String event) {
241         if (!this.alive) {
242             throw new IllegalStateException(this + " is not alive.");
243         }
244
245         synchronized (this) {
246             this.recentEvents.add(event);
247         }
248
249         netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
250
251
252         return broadcast(event);
253     }
254
255
256     @Override
257     public void setFilter(String filter) {
258         if (consumer instanceof FilterableBusConsumer) {
259             ((FilterableBusConsumer) consumer).setFilter(filter);
260
261         } else {
262             throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
263         }
264     }
265
266     @Override
267     public String toString() {
268         return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
269                 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
270                 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
271                 + topicListeners.size() + ", toString()=" + super.toString() + "]";
272     }
273
274     /**
275      * {@inheritDoc}
276      */
277     @Override
278     public String getConsumerGroup() {
279         return consumerGroup;
280     }
281
282     /**
283      * {@inheritDoc}
284      */
285     @Override
286     public String getConsumerInstance() {
287         return consumerInstance;
288     }
289
290     /**
291      * {@inheritDoc}
292      */
293     @Override
294     public void shutdown() {
295         this.stop();
296         this.topicListeners.clear();
297     }
298
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     public int getFetchTimeout() {
304         return fetchTimeout;
305     }
306
307     /**
308      * {@inheritDoc}
309      */
310     @Override
311     public int getFetchLimit() {
312         return fetchLimit;
313     }
314
315     /**
316      * Member variables of this Params class are as follows
317      * servers DMaaP servers
318      * topic DMaaP Topic to be monitored
319      * apiKey DMaaP API Key (optional)
320      * apiSecret DMaaP API Secret (optional)
321      * consumerGroup DMaaP Reader Consumer Group
322      * consumerInstance DMaaP Reader Instance
323      * fetchTimeout DMaaP fetch timeout
324      * fetchLimit DMaaP fetch limit
325      * environment DME2 Environment
326      * aftEnvironment DME2 AFT Environment
327      * partner DME2 Partner
328      * latitude DME2 Latitude
329      * longitude DME2 Longitude
330      * additionalProps Additional properties to pass to DME2
331      * useHttps does connection use HTTPS?
332      * allowSelfSignedCerts are self-signed certificates allow
333      *
334      */
335     public static class BusTopicParams {
336
337         public static TopicParamsBuilder builder() {
338             return new TopicParamsBuilder();
339         }
340         private List<String> servers;
341         private String topic;
342         private String apiKey;
343         private String apiSecret;
344         private String consumerGroup;
345         private String consumerInstance;
346         private int fetchTimeout;
347         private int fetchLimit;
348         private boolean useHttps;
349         private boolean allowSelfSignedCerts;
350
351         private String userName;
352         private String password;
353         private String environment;
354         private String aftEnvironment;
355         private String partner;
356         private String latitude;
357         private String longitude;
358         private Map<String, String> additionalProps;
359
360         public String getUserName() {
361             return userName;
362         }
363
364         public String getPassword() {
365             return password;
366         }
367
368         public String getEnvironment() {
369             return environment;
370         }
371
372         public String getAftEnvironment() {
373             return aftEnvironment;
374         }
375
376         public String getPartner() {
377             return partner;
378         }
379
380         public String getLatitude() {
381             return latitude;
382         }
383
384         public String getLongitude() {
385             return longitude;
386         }
387
388         public Map<String, String> getAdditionalProps() {
389             return additionalProps;
390         }
391
392         public List<String> getServers() {
393             return servers;
394         }
395
396         public String getTopic() {
397             return topic;
398         }
399
400         public String getApiKey() {
401             return apiKey;
402         }
403
404         public String getApiSecret() {
405             return apiSecret;
406         }
407
408         public String getConsumerGroup() {
409             return consumerGroup;
410         }
411
412         public String getConsumerInstance() {
413             return consumerInstance;
414         }
415
416         public int getFetchTimeout() {
417             return fetchTimeout;
418         }
419
420         public int getFetchLimit() {
421             return fetchLimit;
422         }
423
424         public boolean isUseHttps() {
425             return useHttps;
426         }
427
428         public boolean isAllowSelfSignedCerts() {
429             return allowSelfSignedCerts;
430         }
431
432
433         public static class TopicParamsBuilder {
434             BusTopicParams m = new BusTopicParams();
435
436             private TopicParamsBuilder() {
437             }
438
439             public TopicParamsBuilder servers(List<String> servers) {
440                 this.m.servers = servers;
441                 return this;
442             }
443
444             public TopicParamsBuilder topic(String topic) {
445                 this.m.topic = topic;
446                 return this;
447             }
448
449             public TopicParamsBuilder apiKey(String apiKey) {
450                 this.m.apiKey = apiKey;
451                 return this;
452             }
453
454             public TopicParamsBuilder apiSecret(String apiSecret) {
455                 this.m.apiSecret = apiSecret;
456                 return this;
457             }
458
459             public TopicParamsBuilder consumerGroup(String consumerGroup) {
460                 this.m.consumerGroup = consumerGroup;
461                 return this;
462             }
463
464             public TopicParamsBuilder consumerInstance(String consumerInstance) {
465                 this.m.consumerInstance = consumerInstance;
466                 return this;
467             }
468
469             public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
470                 this.m.fetchTimeout = fetchTimeout;
471                 return this;
472             }
473
474             public TopicParamsBuilder fetchLimit(int fetchLimit) {
475                 this.m.fetchLimit = fetchLimit;
476                 return this;
477             }
478
479             public TopicParamsBuilder useHttps(boolean useHttps) {
480                 this.m.useHttps = useHttps;
481                 return this;
482             }
483
484             public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
485                 this.m.allowSelfSignedCerts = allowSelfSignedCerts;
486                 return this;
487             }
488
489             public TopicParamsBuilder userName(String userName) {
490                 this.m.userName = userName;
491                 return this;
492             }
493
494             public TopicParamsBuilder password(String password) {
495                 this.m.password = password;
496                 return this;
497             }
498
499             public TopicParamsBuilder environment(String environment) {
500                 this.m.environment = environment;
501                 return this;
502             }
503
504             public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
505                 this.m.aftEnvironment = aftEnvironment;
506                 return this;
507             }
508
509             public TopicParamsBuilder partner(String partner) {
510                 this.m.partner = partner;
511                 return this;
512             }
513
514             public TopicParamsBuilder latitude(String latitude) {
515                 this.m.latitude = latitude;
516                 return this;
517             }
518
519             public TopicParamsBuilder longitude(String longitude) {
520                 this.m.longitude = longitude;
521                 return this;
522             }
523
524             public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
525                 this.m.additionalProps = additionalProps;
526                 return this;
527             }
528
529             public BusTopicParams build() {
530                 return m;
531             }
532
533         }
534
535     }
536 }