Make feature-pooling-dmaap work without filtering
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / DmaapManager.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
25 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
26 import org.onap.policy.common.endpoints.event.comm.TopicListener;
27 import org.onap.policy.common.endpoints.event.comm.TopicSink;
28 import org.onap.policy.common.endpoints.event.comm.TopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * Manages the internal DMaaP topic. Assumes all topics are managed by
34  * {@link TopicEndpoint#manager}.
35  */
36 public class DmaapManager {
37
38     private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
39
40     /**
41      * Name of the DMaaP topic.
42      */
43     private final String topic;
44
45     /**
46      * Topic source whose filter is to be manipulated.
47      */
48     private final TopicSource topicSource;
49
50     /**
51      * Where to publish messages.
52      */
53     private final TopicSink topicSink;
54
55     /**
56      * {@code True} if the consumer is running, {@code false} otherwise.
57      */
58     private boolean consuming = false;
59
60     /**
61      * {@code True} if the publisher is running, {@code false} otherwise.
62      */
63     private boolean publishing = false;
64
65     /**
66      * Constructs the manager, but does not start the source or sink.
67      *
68      * @param topic name of the internal DMaaP topic
69      * @throws PoolingFeatureException if an error occurs
70      */
71     public DmaapManager(String topic) throws PoolingFeatureException {
72
73         logger.info("initializing bus for topic {}", topic);
74
75         try {
76             this.topic = topic;
77
78             this.topicSource = findTopicSource();
79             this.topicSink = findTopicSink();
80
81         } catch (IllegalArgumentException e) {
82             logger.error("failed to attach to topic {}", topic);
83             throw new PoolingFeatureException(e);
84         }
85     }
86
87     public String getTopic() {
88         return topic;
89     }
90
91     /**
92      * Finds the topic source associated with the internal DMaaP topic.
93      *
94      * @return the topic source
95      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
96      */
97     private TopicSource findTopicSource() throws PoolingFeatureException {
98         for (TopicSource src : getTopicSources()) {
99             if (topic.equals(src.getTopic())) {
100                 return src;
101             }
102         }
103
104         throw new PoolingFeatureException("missing topic source " + topic);
105     }
106
107     /**
108      * Finds the topic sink associated with the internal DMaaP topic.
109      *
110      * @return the topic sink
111      * @throws PoolingFeatureException if the sink doesn't exist
112      */
113     private TopicSink findTopicSink() throws PoolingFeatureException {
114         for (TopicSink sink : getTopicSinks()) {
115             if (topic.equals(sink.getTopic())) {
116                 return sink;
117             }
118         }
119
120         throw new PoolingFeatureException("missing topic sink " + topic);
121     }
122
123     /**
124      * Starts the publisher, if it isn't already running.
125      */
126     public void startPublisher() {
127         if (publishing) {
128             return;
129         }
130
131         logger.info("start publishing to topic {}", topic);
132         publishing = true;
133     }
134
135     /**
136      * Stops the publisher.
137      *
138      * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
139      *        close
140      */
141     public void stopPublisher(long waitMs) {
142         if (!publishing) {
143             return;
144         }
145
146         /*
147          * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
148          * could be passed to sink.stop(), but that isn't an option at this time.
149          */
150         try {
151             Thread.sleep(waitMs);
152
153         } catch (InterruptedException e) {
154             logger.warn("message transmission stopped due to {}", e.getMessage());
155             Thread.currentThread().interrupt();
156         }
157
158         logger.info("stop publishing to topic {}", topic);
159         publishing = false;
160     }
161
162     /**
163      * Starts the consumer, if it isn't already running.
164      *
165      * @param listener listener to register with the source
166      */
167     public void startConsumer(TopicListener listener) {
168         if (consuming) {
169             return;
170         }
171
172         logger.info("start consuming from topic {}", topic);
173         topicSource.register(listener);
174         consuming = true;
175     }
176
177     /**
178      * Stops the consumer.
179      *
180      * @param listener listener to unregister with the source
181      */
182     public void stopConsumer(TopicListener listener) {
183         if (!consuming) {
184             return;
185         }
186
187         logger.info("stop consuming from topic {}", topic);
188         consuming = false;
189         topicSource.unregister(listener);
190     }
191
192     /**
193      * Publishes a message to the sink.
194      *
195      * @param msg message to be published
196      * @throws PoolingFeatureException if an error occurs or the publisher isn't running
197      */
198     public void publish(String msg) throws PoolingFeatureException {
199         if (!publishing) {
200             throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
201         }
202
203         try {
204             if (!topicSink.send(msg)) {
205                 throw new PoolingFeatureException("failed to send to topic sink " + topic);
206             }
207
208         } catch (IllegalStateException e) {
209             throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
210         }
211     }
212
213     /*
214      * The remaining methods may be overridden by junit tests.
215      */
216
217     /**
218      * Get topic source.
219      *
220      * @return the topic sources
221      */
222     protected List<TopicSource> getTopicSources() {
223         return TopicEndpointManager.getManager().getTopicSources();
224     }
225
226     /**
227      * Get topic sinks.
228      *
229      * @return the topic sinks
230      */
231     protected List<TopicSink> getTopicSinks() {
232         return TopicEndpointManager.getManager().getTopicSinks();
233     }
234 }