2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.pooling;
24 import java.util.List;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.TopicSink;
30 import org.onap.policy.common.endpoints.event.comm.TopicSource;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * Manages the internal topic. Assumes all topics are managed by
36 * {@link TopicEndpoint}.
38 public class TopicMessageManager {
40 private static final Logger logger = LoggerFactory.getLogger(TopicMessageManager.class);
46 private final String topic;
49 * Topic source whose filter is to be manipulated.
51 private final TopicSource topicSource;
54 * Where to publish messages.
56 private final TopicSink topicSink;
59 * {@code True} if the consumer is running, {@code false} otherwise.
61 private boolean consuming = false;
64 * {@code True} if the publisher is running, {@code false} otherwise.
66 private boolean publishing = false;
69 * Constructs the manager, but does not start the source or sink.
71 * @param topic name of the internal topic
72 * @throws PoolingFeatureException if an error occurs
74 public TopicMessageManager(String topic) throws PoolingFeatureException {
76 logger.info("initializing bus for topic {}", topic);
81 this.topicSource = findTopicSource();
82 this.topicSink = findTopicSink();
84 } catch (IllegalArgumentException e) {
85 logger.error("failed to attach to topic {}", topic);
86 throw new PoolingFeatureException(e);
91 * Finds the topic source associated with the internal topic.
93 * @return the topic source
94 * @throws PoolingFeatureException if the source doesn't exist or is not filterable
96 private TopicSource findTopicSource() throws PoolingFeatureException {
97 for (TopicSource src : getTopicSources()) {
98 if (topic.equals(src.getTopic())) {
103 throw new PoolingFeatureException("missing topic source " + topic);
107 * Finds the topic sink associated with the internal topic.
109 * @return the topic sink
110 * @throws PoolingFeatureException if the sink doesn't exist
112 private TopicSink findTopicSink() throws PoolingFeatureException {
113 for (TopicSink sink : getTopicSinks()) {
114 if (topic.equals(sink.getTopic())) {
119 throw new PoolingFeatureException("missing topic sink " + topic);
123 * Starts the publisher, if it isn't already running.
125 public void startPublisher() {
130 logger.info("start publishing to topic {}", topic);
135 * Stops the publisher.
137 * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
140 public void stopPublisher(long waitMs) {
146 * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
147 * could be passed to sink.stop(), but that isn't an option at this time.
150 Thread.sleep(waitMs);
152 } catch (InterruptedException e) {
153 logger.warn("message transmission stopped due to {}", e.getMessage());
154 Thread.currentThread().interrupt();
157 logger.info("stop publishing to topic {}", topic);
162 * Starts the consumer, if it isn't already running.
164 * @param listener listener to register with the source
166 public void startConsumer(TopicListener listener) {
171 logger.info("start consuming from topic {}", topic);
172 topicSource.register(listener);
177 * Stops the consumer.
179 * @param listener listener to unregister with the source
181 public void stopConsumer(TopicListener listener) {
186 logger.info("stop consuming from topic {}", topic);
188 topicSource.unregister(listener);
192 * Publishes a message to the sink.
194 * @param msg message to be published
195 * @throws PoolingFeatureException if an error occurs or the publisher isn't running
197 public void publish(String msg) throws PoolingFeatureException {
199 throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
203 if (!topicSink.send(msg)) {
204 throw new PoolingFeatureException("failed to send to topic sink " + topic);
207 } catch (IllegalStateException e) {
208 throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
213 * The remaining methods may be overridden by junit tests.
219 * @return the topic sources
221 protected List<TopicSource> getTopicSources() {
222 return TopicEndpointManager.getManager().getTopicSources();
228 * @return the topic sinks
230 protected List<TopicSink> getTopicSinks() {
231 return TopicEndpointManager.getManager().getTopicSinks();