2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.util.List;
24 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
25 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
26 import org.onap.policy.common.endpoints.event.comm.TopicListener;
27 import org.onap.policy.common.endpoints.event.comm.TopicSink;
28 import org.onap.policy.common.endpoints.event.comm.TopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Manages the internal DMaaP topic. Assumes all topics are managed by
34 * {@link TopicEndpoint#manager}.
36 public class DmaapManager {
38 private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
41 * Name of the DMaaP topic.
43 private final String topic;
46 * Topic source whose filter is to be manipulated.
48 private final TopicSource topicSource;
51 * Where to publish messages.
53 private final TopicSink topicSink;
56 * {@code True} if the consumer is running, {@code false} otherwise.
58 private boolean consuming = false;
61 * {@code True} if the publisher is running, {@code false} otherwise.
63 private boolean publishing = false;
66 * Constructs the manager, but does not start the source or sink.
68 * @param topic name of the internal DMaaP topic
69 * @throws PoolingFeatureException if an error occurs
71 public DmaapManager(String topic) throws PoolingFeatureException {
73 logger.info("initializing bus for topic {}", topic);
78 this.topicSource = findTopicSource();
79 this.topicSink = findTopicSink();
81 } catch (IllegalArgumentException e) {
82 logger.error("failed to attach to topic {}", topic);
83 throw new PoolingFeatureException(e);
87 public String getTopic() {
92 * Finds the topic source associated with the internal DMaaP topic.
94 * @return the topic source
95 * @throws PoolingFeatureException if the source doesn't exist or is not filterable
97 private TopicSource findTopicSource() throws PoolingFeatureException {
98 for (TopicSource src : getTopicSources()) {
99 if (topic.equals(src.getTopic())) {
104 throw new PoolingFeatureException("missing topic source " + topic);
108 * Finds the topic sink associated with the internal DMaaP topic.
110 * @return the topic sink
111 * @throws PoolingFeatureException if the sink doesn't exist
113 private TopicSink findTopicSink() throws PoolingFeatureException {
114 for (TopicSink sink : getTopicSinks()) {
115 if (topic.equals(sink.getTopic())) {
120 throw new PoolingFeatureException("missing topic sink " + topic);
124 * Starts the publisher, if it isn't already running.
126 public void startPublisher() {
131 logger.info("start publishing to topic {}", topic);
136 * Stops the publisher.
138 * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
141 public void stopPublisher(long waitMs) {
147 * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
148 * could be passed to sink.stop(), but that isn't an option at this time.
151 Thread.sleep(waitMs);
153 } catch (InterruptedException e) {
154 logger.warn("message transmission stopped due to {}", e.getMessage());
155 Thread.currentThread().interrupt();
158 logger.info("stop publishing to topic {}", topic);
163 * Starts the consumer, if it isn't already running.
165 * @param listener listener to register with the source
167 public void startConsumer(TopicListener listener) {
172 logger.info("start consuming from topic {}", topic);
173 topicSource.register(listener);
178 * Stops the consumer.
180 * @param listener listener to unregister with the source
182 public void stopConsumer(TopicListener listener) {
187 logger.info("stop consuming from topic {}", topic);
189 topicSource.unregister(listener);
193 * Publishes a message to the sink.
195 * @param msg message to be published
196 * @throws PoolingFeatureException if an error occurs or the publisher isn't running
198 public void publish(String msg) throws PoolingFeatureException {
200 throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
204 if (!topicSink.send(msg)) {
205 throw new PoolingFeatureException("failed to send to topic sink " + topic);
208 } catch (IllegalStateException e) {
209 throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
214 * The remaining methods may be overridden by junit tests.
220 * @return the topic sources
222 protected List<TopicSource> getTopicSources() {
223 return TopicEndpointManager.getManager().getTopicSources();
229 * @return the topic sinks
231 protected List<TopicSink> getTopicSinks() {
232 return TopicEndpointManager.getManager().getTopicSinks();