674dc40282098ccb5554ff756a62f1ca98dcc285
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / DmaapManager.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24
25 import org.onap.policy.common.endpoints.event.comm.FilterableTopicSource;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.TopicSink;
30 import org.onap.policy.common.endpoints.event.comm.TopicSource;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Manages the internal DMaaP topic. Assumes all topics are managed by
36  * {@link TopicEndpoint#manager}.
37  */
38 public class DmaapManager {
39
40     private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
41
42     /**
43      * Name of the DMaaP topic.
44      */
45     private final String topic;
46
47     /**
48      * Topic source whose filter is to be manipulated.
49      */
50     private final FilterableTopicSource topicSource;
51
52     /**
53      * Where to publish messages.
54      */
55     private final TopicSink topicSink;
56
57     /**
58      * {@code True} if the consumer is running, {@code false} otherwise.
59      */
60     private boolean consuming = false;
61
62     /**
63      * {@code True} if the publisher is running, {@code false} otherwise.
64      */
65     private boolean publishing = false;
66
67     /**
68      * Constructs the manager, but does not start the source or sink.
69      *
70      * @param topic name of the internal DMaaP topic
71      * @throws PoolingFeatureException if an error occurs
72      */
73     public DmaapManager(String topic) throws PoolingFeatureException {
74
75         logger.info("initializing bus for topic {}", topic);
76
77         try {
78             this.topic = topic;
79
80             this.topicSource = findTopicSource();
81             this.topicSink = findTopicSink();
82
83             // verify that we can set the filter
84             setFilter(null);
85
86         } catch (IllegalArgumentException e) {
87             logger.error("failed to attach to topic {}", topic);
88             throw new PoolingFeatureException(e);
89         }
90     }
91
92     public String getTopic() {
93         return topic;
94     }
95
96     /**
97      * Finds the topic source associated with the internal DMaaP topic.
98      *
99      * @return the topic source
100      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
101      */
102     private FilterableTopicSource findTopicSource() throws PoolingFeatureException {
103         for (TopicSource src : getTopicSources()) {
104             if (topic.equals(src.getTopic())) {
105                 if (src instanceof FilterableTopicSource) {
106                     return (FilterableTopicSource) src;
107
108                 } else {
109                     throw new PoolingFeatureException("topic source " + topic + " is not filterable");
110                 }
111             }
112         }
113
114         throw new PoolingFeatureException("missing topic source " + topic);
115     }
116
117     /**
118      * Finds the topic sink associated with the internal DMaaP topic.
119      *
120      * @return the topic sink
121      * @throws PoolingFeatureException if the sink doesn't exist
122      */
123     private TopicSink findTopicSink() throws PoolingFeatureException {
124         for (TopicSink sink : getTopicSinks()) {
125             if (topic.equals(sink.getTopic())) {
126                 return sink;
127             }
128         }
129
130         throw new PoolingFeatureException("missing topic sink " + topic);
131     }
132
133     /**
134      * Starts the publisher, if it isn't already running.
135      */
136     public void startPublisher() {
137         if (publishing) {
138             return;
139         }
140
141         logger.info("start publishing to topic {}", topic);
142         publishing = true;
143     }
144
145     /**
146      * Stops the publisher.
147      *
148      * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
149      *        close
150      */
151     public void stopPublisher(long waitMs) {
152         if (!publishing) {
153             return;
154         }
155
156         /*
157          * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
158          * could be passed to sink.stop(), but that isn't an option at this time.
159          */
160         try {
161             Thread.sleep(waitMs);
162
163         } catch (InterruptedException e) {
164             logger.warn("message transmission stopped due to {}", e.getMessage());
165             Thread.currentThread().interrupt();
166         }
167
168         logger.info("stop publishing to topic {}", topic);
169         publishing = false;
170     }
171
172     /**
173      * Starts the consumer, if it isn't already running.
174      *
175      * @param listener listener to register with the source
176      */
177     public void startConsumer(TopicListener listener) {
178         if (consuming) {
179             return;
180         }
181
182         logger.info("start consuming from topic {}", topic);
183         topicSource.register(listener);
184         consuming = true;
185     }
186
187     /**
188      * Stops the consumer.
189      *
190      * @param listener listener to unregister with the source
191      */
192     public void stopConsumer(TopicListener listener) {
193         if (!consuming) {
194             return;
195         }
196
197         logger.info("stop consuming from topic {}", topic);
198         consuming = false;
199         topicSource.unregister(listener);
200     }
201
202     /**
203      * Sets the server-side filter to be used by the consumer.
204      *
205      * @param filter the filter string, or {@code null} if no filter is to be used
206      * @throws PoolingFeatureException if the topic is not filterable
207      */
208     public void setFilter(String filter) throws PoolingFeatureException {
209         try {
210             logger.debug("change filter for topic {} to {}", topic, filter);
211             topicSource.setFilter(filter);
212
213         } catch (UnsupportedOperationException e) {
214             throw new PoolingFeatureException("cannot filter topic " + topic, e);
215         }
216     }
217
218     /**
219      * Publishes a message to the sink.
220      *
221      * @param msg message to be published
222      * @throws PoolingFeatureException if an error occurs or the publisher isn't running
223      */
224     public void publish(String msg) throws PoolingFeatureException {
225         if (!publishing) {
226             throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
227         }
228
229         try {
230             if (!topicSink.send(msg)) {
231                 throw new PoolingFeatureException("failed to send to topic sink " + topic);
232             }
233
234         } catch (IllegalStateException e) {
235             throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
236         }
237     }
238
239     /*
240      * The remaining methods may be overridden by junit tests.
241      */
242
243     /**
244      * Get topic source.
245      *
246      * @return the topic sources
247      */
248     protected List<TopicSource> getTopicSources() {
249         return TopicEndpointManager.getManager().getTopicSources();
250     }
251
252     /**
253      * Get topic sinks.
254      *
255      * @return the topic sinks
256      */
257     protected List<TopicSink> getTopicSinks() {
258         return TopicEndpointManager.getManager().getTopicSinks();
259     }
260 }