2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.util.List;
25 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
27 import org.onap.policy.common.endpoints.event.comm.TopicListener;
28 import org.onap.policy.common.endpoints.event.comm.TopicSink;
29 import org.onap.policy.common.endpoints.event.comm.TopicSource;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Manages the internal DMaaP topic. Assumes all topics are managed by
35 * {@link TopicEndpoint#manager}.
37 public class DmaapManager {
39 private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
42 * Name of the DMaaP topic.
45 private final String topic;
48 * Topic source whose filter is to be manipulated.
50 private final TopicSource topicSource;
53 * Where to publish messages.
55 private final TopicSink topicSink;
58 * {@code True} if the consumer is running, {@code false} otherwise.
60 private boolean consuming = false;
63 * {@code True} if the publisher is running, {@code false} otherwise.
65 private boolean publishing = false;
68 * Constructs the manager, but does not start the source or sink.
70 * @param topic name of the internal DMaaP topic
71 * @throws PoolingFeatureException if an error occurs
73 public DmaapManager(String topic) throws PoolingFeatureException {
75 logger.info("initializing bus for topic {}", topic);
80 this.topicSource = findTopicSource();
81 this.topicSink = findTopicSink();
83 } catch (IllegalArgumentException e) {
84 logger.error("failed to attach to topic {}", topic);
85 throw new PoolingFeatureException(e);
90 * Finds the topic source associated with the internal DMaaP topic.
92 * @return the topic source
93 * @throws PoolingFeatureException if the source doesn't exist or is not filterable
95 private TopicSource findTopicSource() throws PoolingFeatureException {
96 for (TopicSource src : getTopicSources()) {
97 if (topic.equals(src.getTopic())) {
102 throw new PoolingFeatureException("missing topic source " + topic);
106 * Finds the topic sink associated with the internal DMaaP topic.
108 * @return the topic sink
109 * @throws PoolingFeatureException if the sink doesn't exist
111 private TopicSink findTopicSink() throws PoolingFeatureException {
112 for (TopicSink sink : getTopicSinks()) {
113 if (topic.equals(sink.getTopic())) {
118 throw new PoolingFeatureException("missing topic sink " + topic);
122 * Starts the publisher, if it isn't already running.
124 public void startPublisher() {
129 logger.info("start publishing to topic {}", topic);
134 * Stops the publisher.
136 * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
139 public void stopPublisher(long waitMs) {
145 * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
146 * could be passed to sink.stop(), but that isn't an option at this time.
149 Thread.sleep(waitMs);
151 } catch (InterruptedException e) {
152 logger.warn("message transmission stopped due to {}", e.getMessage());
153 Thread.currentThread().interrupt();
156 logger.info("stop publishing to topic {}", topic);
161 * Starts the consumer, if it isn't already running.
163 * @param listener listener to register with the source
165 public void startConsumer(TopicListener listener) {
170 logger.info("start consuming from topic {}", topic);
171 topicSource.register(listener);
176 * Stops the consumer.
178 * @param listener listener to unregister with the source
180 public void stopConsumer(TopicListener listener) {
185 logger.info("stop consuming from topic {}", topic);
187 topicSource.unregister(listener);
191 * Publishes a message to the sink.
193 * @param msg message to be published
194 * @throws PoolingFeatureException if an error occurs or the publisher isn't running
196 public void publish(String msg) throws PoolingFeatureException {
198 throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
202 if (!topicSink.send(msg)) {
203 throw new PoolingFeatureException("failed to send to topic sink " + topic);
206 } catch (IllegalStateException e) {
207 throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
212 * The remaining methods may be overridden by junit tests.
218 * @return the topic sources
220 protected List<TopicSource> getTopicSources() {
221 return TopicEndpointManager.getManager().getTopicSources();
227 * @return the topic sinks
229 protected List<TopicSink> getTopicSinks() {
230 return TopicEndpointManager.getManager().getTopicSinks();