2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.util.List;
26 import java.util.UUID;
28 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
32 import org.onap.policy.common.utils.network.NetworkUtil;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * This topic source implementation specializes in reading messages over a bus topic source and
38 * notifying its listeners
40 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
41 implements Runnable, BusTopicSource, FilterableTopicSource {
44 * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
45 * that in a single file in a concise format.
47 private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
48 private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
53 protected final String consumerGroup;
56 * Bus consumer instance
58 protected final String consumerInstance;
63 protected final int fetchTimeout;
68 protected final int fetchLimit;
71 * Message Bus Consumer
73 protected BusConsumer consumer;
76 * Independent thread reading message over my topic
78 protected Thread busPollerThread;
84 * @param busTopicParams@throws IllegalArgumentException An invalid parameter passed in
86 public SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
88 super(busTopicParams.getServers(), busTopicParams.getTopic(), busTopicParams.getApiKey(), busTopicParams.getApiSecret(), busTopicParams.isUseHttps(), busTopicParams.isAllowSelfSignedCerts());
90 if (busTopicParams.getConsumerGroup() == null || busTopicParams.getConsumerGroup().isEmpty()) {
91 this.consumerGroup = UUID.randomUUID().toString();
93 this.consumerGroup = busTopicParams.getConsumerGroup();
96 if (busTopicParams.getConsumerInstance() == null || busTopicParams.getConsumerInstance().isEmpty()) {
97 this.consumerInstance = NetworkUtil.getHostname();
99 this.consumerInstance = busTopicParams.getConsumerInstance();
102 if (busTopicParams.getFetchTimeout() <= 0) {
103 this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
105 this.fetchTimeout = busTopicParams.getFetchTimeout();
108 if (busTopicParams.getFetchLimit() <= 0) {
109 this.fetchLimit = NO_LIMIT_FETCH;
111 this.fetchLimit = busTopicParams.getFetchLimit();
117 * Initialize the Bus client
119 public abstract void init() throws MalformedURLException;
122 public void register(TopicListener topicListener) {
124 super.register(topicListener);
127 if (!alive && !locked) {
130 logger.info("{}: register: start not attempted", this);
132 } catch (Exception e) {
133 logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e.getMessage(),
139 public void unregister(TopicListener topicListener) {
141 synchronized (this) {
142 super.unregister(topicListener);
143 stop = this.topicListeners.isEmpty();
152 public boolean start() {
153 logger.info("{}: starting", this);
155 synchronized (this) {
162 throw new IllegalStateException(this + " is locked.");
165 if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
170 this.busPollerThread = new Thread(this);
171 this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
172 busPollerThread.start();
173 } catch (Exception e) {
174 logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
175 throw new IllegalStateException(e);
184 public boolean stop() {
185 logger.info("{}: stopping", this);
187 synchronized (this) {
188 BusConsumer consumerCopy = this.consumer;
191 this.consumer = null;
193 if (consumerCopy != null) {
195 consumerCopy.close();
196 } catch (Exception e) {
197 logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
208 * Run thread method for the Bus Reader
214 for (String event : this.consumer.fetch()) {
215 synchronized (this) {
216 this.recentEvents.add(event);
219 netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic,
220 System.lineSeparator(), event);
228 } catch (Exception e) {
229 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
233 logger.info("{}: exiting thread", this);
240 public boolean offer(String event) {
242 throw new IllegalStateException(this + " is not alive.");
245 synchronized (this) {
246 this.recentEvents.add(event);
249 netLogger.info("[IN|{}|{}]{}{}", this.getTopicCommInfrastructure(), this.topic, System.lineSeparator(), event);
252 return broadcast(event);
257 public void setFilter(String filter) {
258 if (consumer instanceof FilterableBusConsumer) {
259 ((FilterableBusConsumer) consumer).setFilter(filter);
262 throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
267 public String toString() {
268 return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
269 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
270 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
271 + topicListeners.size() + ", toString()=" + super.toString() + "]";
278 public String getConsumerGroup() {
279 return consumerGroup;
286 public String getConsumerInstance() {
287 return consumerInstance;
294 public void shutdown() {
296 this.topicListeners.clear();
303 public int getFetchTimeout() {
311 public int getFetchLimit() {
316 * Member variables of this Params class are as follows
317 * servers DMaaP servers
318 * topic DMaaP Topic to be monitored
319 * apiKey DMaaP API Key (optional)
320 * apiSecret DMaaP API Secret (optional)
321 * consumerGroup DMaaP Reader Consumer Group
322 * consumerInstance DMaaP Reader Instance
323 * fetchTimeout DMaaP fetch timeout
324 * fetchLimit DMaaP fetch limit
325 * environment DME2 Environment
326 * aftEnvironment DME2 AFT Environment
327 * partner DME2 Partner
328 * latitude DME2 Latitude
329 * longitude DME2 Longitude
330 * additionalProps Additional properties to pass to DME2
331 * useHttps does connection use HTTPS?
332 * allowSelfSignedCerts are self-signed certificates allow
335 public static class BusTopicParams {
337 public static TopicParamsBuilder builder() {
338 return new TopicParamsBuilder();
340 private List<String> servers;
341 private String topic;
342 private String apiKey;
343 private String apiSecret;
344 private String consumerGroup;
345 private String consumerInstance;
346 private int fetchTimeout;
347 private int fetchLimit;
348 private boolean useHttps;
349 private boolean allowSelfSignedCerts;
351 private String userName;
352 private String password;
353 private String environment;
354 private String aftEnvironment;
355 private String partner;
356 private String latitude;
357 private String longitude;
358 private Map<String, String> additionalProps;
360 public String getUserName() {
364 public String getPassword() {
368 public String getEnvironment() {
372 public String getAftEnvironment() {
373 return aftEnvironment;
376 public String getPartner() {
380 public String getLatitude() {
384 public String getLongitude() {
388 public Map<String, String> getAdditionalProps() {
389 return additionalProps;
392 public List<String> getServers() {
396 public String getTopic() {
400 public String getApiKey() {
404 public String getApiSecret() {
408 public String getConsumerGroup() {
409 return consumerGroup;
412 public String getConsumerInstance() {
413 return consumerInstance;
416 public int getFetchTimeout() {
420 public int getFetchLimit() {
424 public boolean isUseHttps() {
428 public boolean isAllowSelfSignedCerts() {
429 return allowSelfSignedCerts;
433 public static class TopicParamsBuilder {
434 BusTopicParams m = new BusTopicParams();
436 private TopicParamsBuilder() {
439 public TopicParamsBuilder servers(List<String> servers) {
440 this.m.servers = servers;
444 public TopicParamsBuilder topic(String topic) {
445 this.m.topic = topic;
449 public TopicParamsBuilder apiKey(String apiKey) {
450 this.m.apiKey = apiKey;
454 public TopicParamsBuilder apiSecret(String apiSecret) {
455 this.m.apiSecret = apiSecret;
459 public TopicParamsBuilder consumerGroup(String consumerGroup) {
460 this.m.consumerGroup = consumerGroup;
464 public TopicParamsBuilder consumerInstance(String consumerInstance) {
465 this.m.consumerInstance = consumerInstance;
469 public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
470 this.m.fetchTimeout = fetchTimeout;
474 public TopicParamsBuilder fetchLimit(int fetchLimit) {
475 this.m.fetchLimit = fetchLimit;
479 public TopicParamsBuilder useHttps(boolean useHttps) {
480 this.m.useHttps = useHttps;
484 public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
485 this.m.allowSelfSignedCerts = allowSelfSignedCerts;
489 public TopicParamsBuilder userName(String userName) {
490 this.m.userName = userName;
494 public TopicParamsBuilder password(String password) {
495 this.m.password = password;
499 public TopicParamsBuilder environment(String environment) {
500 this.m.environment = environment;
504 public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
505 this.m.aftEnvironment = aftEnvironment;
509 public TopicParamsBuilder partner(String partner) {
510 this.m.partner = partner;
514 public TopicParamsBuilder latitude(String latitude) {
515 this.m.latitude = latitude;
519 public TopicParamsBuilder longitude(String longitude) {
520 this.m.longitude = longitude;
524 public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
525 this.m.additionalProps = additionalProps;
529 public BusTopicParams build() {