2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.pooling;
23 import java.util.List;
25 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.TopicSink;
30 import org.onap.policy.common.endpoints.event.comm.TopicSource;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
35 * Manages the internal DMaaP topic. Assumes all topics are managed by
36 * {@link TopicEndpoint#manager}.
38 public class DmaapManager {
40 private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
43 * Name of the DMaaP topic.
45 private final String topic;
48 * Topic source whose filter is to be manipulated.
50 private final FilterableTopicSource topicSource;
53 * Where to publish messages.
55 private final TopicSink topicSink;
58 * {@code True} if the consumer is running, {@code false} otherwise.
60 private boolean consuming = false;
63 * {@code True} if the publisher is running, {@code false} otherwise.
65 private boolean publishing = false;
68 * Constructs the manager, but does not start the source or sink.
70 * @param topic name of the internal DMaaP topic
71 * @throws PoolingFeatureException if an error occurs
73 public DmaapManager(String topic) throws PoolingFeatureException {
75 logger.info("initializing bus for topic {}", topic);
80 this.topicSource = findTopicSource();
81 this.topicSink = findTopicSink();
83 // verify that we can set the filter
86 } catch (IllegalArgumentException e) {
87 logger.error("failed to attach to topic {}", topic);
88 throw new PoolingFeatureException(e);
92 public String getTopic() {
97 * Finds the topic source associated with the internal DMaaP topic.
99 * @return the topic source
100 * @throws PoolingFeatureException if the source doesn't exist or is not filterable
102 private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
103 for (TopicSource src : getTopicSources()) {
104 if (topic.equals(src.getTopic())) {
105 if (src instanceof FilterableTopicSource) {
106 return (FilterableTopicSource) src;
109 throw new PoolingFeatureException("topic source " + topic + " is not filterable");
114 throw new PoolingFeatureException("missing topic source " + topic);
118 * Finds the topic sink associated with the internal DMaaP topic.
120 * @return the topic sink
121 * @throws PoolingFeatureException if the sink doesn't exist
123 private TopicSink findTopicSink() throws PoolingFeatureException {
124 for (TopicSink sink : getTopicSinks()) {
125 if (topic.equals(sink.getTopic())) {
130 throw new PoolingFeatureException("missing topic sink " + topic);
134 * Starts the publisher, if it isn't already running.
136 public void startPublisher() {
141 logger.info("start publishing to topic {}", topic);
146 * Stops the publisher.
148 * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
151 public void stopPublisher(long waitMs) {
157 * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
158 * could be passed to sink.stop(), but that isn't an option at this time.
161 Thread.sleep(waitMs);
163 } catch (InterruptedException e) {
164 logger.warn("message transmission stopped due to {}", e.getMessage());
165 Thread.currentThread().interrupt();
168 logger.info("stop publishing to topic {}", topic);
173 * Starts the consumer, if it isn't already running.
175 * @param listener listener to register with the source
177 public void startConsumer(TopicListener listener) {
182 logger.info("start consuming from topic {}", topic);
183 topicSource.register(listener);
188 * Stops the consumer.
190 * @param listener listener to unregister with the source
192 public void stopConsumer(TopicListener listener) {
197 logger.info("stop consuming from topic {}", topic);
199 topicSource.unregister(listener);
203 * Sets the server-side filter to be used by the consumer.
205 * @param filter the filter string, or {@code null} if no filter is to be used
206 * @throws PoolingFeatureException if the topic is not filterable
208 public void setFilter(String filter) throws PoolingFeatureException {
210 logger.debug("change filter for topic {} to {}", topic, filter);
211 topicSource.setFilter(filter);
213 } catch (UnsupportedOperationException e) {
214 throw new PoolingFeatureException("cannot filter topic " + topic, e);
219 * Publishes a message to the sink.
221 * @param msg message to be published
222 * @throws PoolingFeatureException if an error occurs or the publisher isn't running
224 public void publish(String msg) throws PoolingFeatureException {
226 throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
230 if (!topicSink.send(msg)) {
231 throw new PoolingFeatureException("failed to send to topic sink " + topic);
234 } catch (IllegalStateException e) {
235 throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
240 * The remaining methods may be overridden by junit tests.
246 * @return the topic sources
248 protected List<TopicSource> getTopicSources() {
249 return TopicEndpointManager.getManager().getTopicSources();
255 * @return the topic sinks
257 protected List<TopicSink> getTopicSinks() {
258 return TopicEndpointManager.getManager().getTopicSinks();