2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7 * Modifications Copyright (C) 2020 Bell Canada. All rights reserved.
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.common.endpoints.event.comm.bus.internal;
25 import java.io.IOException;
26 import java.net.MalformedURLException;
27 import java.util.UUID;
29 import org.onap.policy.common.endpoints.event.comm.TopicListener;
30 import org.onap.policy.common.endpoints.event.comm.bus.BusTopicSource;
31 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
32 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
33 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
34 import org.onap.policy.common.utils.network.NetworkUtil;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
39 * This topic source implementation specializes in reading messages over a bus topic source and
40 * notifying its listeners.
42 public abstract class SingleThreadedBusTopicSource extends BusTopicBase
43 implements Runnable, BusTopicSource {
46 * Not to be converted to PolicyLogger. This will contain all instract /out traffic and only
47 * that in a single file in a concise format.
49 private static Logger logger = LoggerFactory.getLogger(SingleThreadedBusTopicSource.class);
55 protected final String consumerGroup;
58 * Bus consumer instance.
61 protected final String consumerInstance;
67 protected final int fetchTimeout;
73 protected final int fetchLimit;
76 * Message Bus Consumer.
78 protected BusConsumer consumer;
81 * Independent thread reading message over my topic.
83 protected Thread busPollerThread;
89 * @param busTopicParams topic parameters
91 * @throws IllegalArgumentException An invalid parameter passed in
93 protected SingleThreadedBusTopicSource(BusTopicParams busTopicParams) {
95 super(busTopicParams);
97 if (busTopicParams.isConsumerGroupInvalid()) {
98 this.consumerGroup = UUID.randomUUID().toString();
100 this.consumerGroup = busTopicParams.getConsumerGroup();
103 if (busTopicParams.isConsumerInstanceInvalid()) {
104 this.consumerInstance = NetworkUtil.getHostname();
106 this.consumerInstance = busTopicParams.getConsumerInstance();
109 if (busTopicParams.getFetchTimeout() <= 0) {
110 this.fetchTimeout = PolicyEndPointProperties.NO_TIMEOUT_MS_FETCH;
112 this.fetchTimeout = busTopicParams.getFetchTimeout();
115 if (busTopicParams.getFetchLimit() <= 0) {
116 this.fetchLimit = PolicyEndPointProperties.NO_LIMIT_FETCH;
118 this.fetchLimit = busTopicParams.getFetchLimit();
124 * Initialize the Bus client.
126 public abstract void init() throws MalformedURLException;
129 public void register(TopicListener topicListener) {
131 super.register(topicListener);
134 if (!alive && !locked) {
137 logger.info("{}: register: start not attempted", this);
139 } catch (Exception e) {
140 logger.warn("{}: cannot start after registration of because of: {}", this, topicListener, e);
145 public void unregister(TopicListener topicListener) {
147 synchronized (this) {
148 super.unregister(topicListener);
149 stop = this.topicListeners.isEmpty();
158 public boolean start() {
159 logger.info("{}: starting", this);
161 synchronized (this) {
168 throw new IllegalStateException(this + " is locked.");
171 if (this.busPollerThread == null || !this.busPollerThread.isAlive() || this.consumer == null) {
176 this.busPollerThread = makePollerThread();
177 this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
178 busPollerThread.start();
180 } catch (Exception e) {
181 throw new IllegalStateException(this + ": cannot start", e);
190 * Makes a new thread to be used for polling.
192 * @return a new Thread
194 protected Thread makePollerThread() {
195 return new Thread(this);
199 public boolean stop() {
200 logger.info("{}: stopping", this);
202 synchronized (this) {
203 BusConsumer consumerCopy = this.consumer;
206 this.consumer = null;
208 if (consumerCopy != null) {
210 consumerCopy.close();
211 } catch (Exception e) {
212 logger.warn("{}: stop failed because of {}", this, e.getMessage(), e);
223 * Run thread method for the Bus Reader.
230 } catch (IOException | RuntimeException e) {
231 logger.error("{}: cannot fetch", this, e);
235 logger.info("{}: exiting thread", this);
238 private void fetchAllMessages() throws IOException {
239 for (String event : this.consumer.fetch()) {
240 synchronized (this) {
241 this.recentEvents.add(event);
244 NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
255 public boolean offer(String event) {
257 throw new IllegalStateException(this + " is not alive.");
260 synchronized (this) {
261 this.recentEvents.add(event);
264 NetLoggerUtil.log(EventType.IN, this.getTopicCommInfrastructure(), this.topic, event);
266 return broadcast(event);
270 public String toString() {
271 return "SingleThreadedBusTopicSource [consumerGroup=" + consumerGroup + ", consumerInstance=" + consumerInstance
272 + ", fetchTimeout=" + fetchTimeout + ", fetchLimit=" + fetchLimit + ", consumer=" + this.consumer
273 + ", alive=" + alive + ", locked=" + locked + ", uebThread=" + busPollerThread + ", topicListeners="
274 + topicListeners.size() + ", toString()=" + super.toString() + "]";
278 public void shutdown() {
280 this.topicListeners.clear();