2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.event.comm.bus.internal;
23 import java.net.MalformedURLException;
24 import java.util.List;
25 import java.util.UUID;
26 import org.onap.policy.drools.event.comm.FilterableTopicSource;
27 import org.onap.policy.drools.event.comm.TopicListener;
28 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
29 import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
30 import org.onap.policy.drools.utils.NetworkUtil;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * This topic source implementation specializes in reading messages
36 * over a bus topic source and notifying its listeners
38 public abstract class SingleThreadedBusTopicSource
40 implements Runnable, BusTopicSource, FilterableTopicSource {
43 * Not to be converted to PolicyLogger.
44 * This will contain all instract /out traffic and only that in a single file in a concise format.
46 private static Logger logger = LoggerFactory.getLogger(InlineBusTopicSink.class);
47 private static final Logger netLogger = LoggerFactory.getLogger(NETWORK_LOGGER);
52 protected final String consumerGroup;
55 * Bus consumer instance
57 protected final String consumerInstance;
62 protected final int fetchTimeout;
67 protected final int fetchLimit;
70 * Message Bus Consumer
72 protected BusConsumer consumer;
75 * Independent thread reading message over my topic
77 protected Thread busPollerThread;
82 * @param servers Bus servers
83 * @param topic Bus Topic to be monitored
84 * @param apiKey Bus API Key (optional)
85 * @param apiSecret Bus API Secret (optional)
86 * @param consumerGroup Bus Reader Consumer Group
87 * @param consumerInstance Bus Reader Instance
88 * @param fetchTimeout Bus fetch timeout
89 * @param fetchLimit Bus fetch limit
90 * @param useHttps does the bus use https
91 * @param allowSelfSignedCerts are self-signed certificates allowed
92 * @throws IllegalArgumentException An invalid parameter passed in
94 public SingleThreadedBusTopicSource(List<String> servers,
99 String consumerInstance,
103 boolean allowSelfSignedCerts) {
105 super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts);
107 if (consumerGroup == null || consumerGroup.isEmpty()) {
108 this.consumerGroup = UUID.randomUUID ().toString();
110 this.consumerGroup = consumerGroup;
113 if (consumerInstance == null || consumerInstance.isEmpty()) {
114 this.consumerInstance = NetworkUtil.getHostname();
116 this.consumerInstance = consumerInstance;
119 if (fetchTimeout <= 0) {
120 this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
122 this.fetchTimeout = fetchTimeout;
125 if (fetchLimit <= 0) {
126 this.fetchLimit = NO_LIMIT_FETCH;
128 this.fetchLimit = fetchLimit;
134 * Initialize the Bus client
136 public abstract void init() throws MalformedURLException;
139 public void register(TopicListener topicListener) {
141 super.register(topicListener);
144 if (!alive && !locked)
147 logger.info("{}: register: start not attempted", this);
148 } catch (Exception e) {
149 logger.warn("{}: cannot start after registration of because of: {}",
150 this, topicListener, e.getMessage(), e);
155 public void unregister(TopicListener topicListener) {
157 synchronized (this) {
158 super.unregister(topicListener);
159 stop = this.topicListeners.isEmpty();
168 public boolean start() {
169 logger.info("{}: starting", this);
177 throw new IllegalStateException(this + " is locked.");
179 if (this.busPollerThread == null ||
180 !this.busPollerThread.isAlive() ||
181 this.consumer == null) {
186 this.busPollerThread = new Thread(this);
187 this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
188 busPollerThread.start();
189 } catch (Exception e) {
190 logger.warn("{}: cannot start because of {}", this, e.getMessage(), e);
191 throw new IllegalStateException(e);
200 public boolean stop() {
201 logger.info("{}: stopping", this);
204 BusConsumer consumerCopy = this.consumer;
207 this.consumer = null;
209 if (consumerCopy != null) {
211 consumerCopy.close();
212 } catch (Exception e) {
213 logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
224 * Run thread method for the Bus Reader
230 for (String event: this.consumer.fetch()) {
231 synchronized (this) {
232 this.recentEvents.add(event);
235 netLogger.info("[IN|{}|{}]{}{}",
236 this.getTopicCommInfrastructure(), this.topic,
237 System.lineSeparator(), event);
244 } catch (Exception e) {
245 logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
249 logger.info("{}: exiting thread", this);
256 public boolean offer(String event) {
258 throw new IllegalStateException(this + " is not alive.");
261 synchronized (this) {
262 this.recentEvents.add(event);
265 netLogger.info("[IN|{}|{}]{}{}",this.getTopicCommInfrastructure(),this.topic,
266 System.lineSeparator(), event);
269 return broadcast(event);
274 public void setFilter(String filter) {
275 if(consumer instanceof FilterableBusConsumer) {
276 ((FilterableBusConsumer) consumer).setFilter(filter);
279 throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
284 public String toString() {
285 return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
286 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer="
287 + this.consumer + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread
288 + ", topicListeners=" + topicListeners.size() + ", toString()=" + super.toString() + "]";
295 public String getConsumerGroup() {
296 return consumerGroup;
303 public String getConsumerInstance() {
304 return consumerInstance;
311 public void shutdown() {
313 this.topicListeners.clear();
320 public int getFetchTimeout() {
328 public int getFetchLimit() {