Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-dmaap / src / main / java / org / onap / policy / drools / pooling / DmaapManager.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.drools.pooling;
22
23 import java.util.List;
24 import lombok.Getter;
25 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
27 import org.onap.policy.common.endpoints.event.comm.TopicListener;
28 import org.onap.policy.common.endpoints.event.comm.TopicSink;
29 import org.onap.policy.common.endpoints.event.comm.TopicSource;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Manages the internal DMaaP topic. Assumes all topics are managed by
35  * {@link TopicEndpoint#manager}.
36  */
37 public class DmaapManager {
38
39     private static final Logger logger = LoggerFactory.getLogger(DmaapManager.class);
40
41     /**
42      * Name of the DMaaP topic.
43      */
44     @Getter
45     private final String topic;
46
47     /**
48      * Topic source whose filter is to be manipulated.
49      */
50     private final TopicSource topicSource;
51
52     /**
53      * Where to publish messages.
54      */
55     private final TopicSink topicSink;
56
57     /**
58      * {@code True} if the consumer is running, {@code false} otherwise.
59      */
60     private boolean consuming = false;
61
62     /**
63      * {@code True} if the publisher is running, {@code false} otherwise.
64      */
65     private boolean publishing = false;
66
67     /**
68      * Constructs the manager, but does not start the source or sink.
69      *
70      * @param topic name of the internal DMaaP topic
71      * @throws PoolingFeatureException if an error occurs
72      */
73     public DmaapManager(String topic) throws PoolingFeatureException {
74
75         logger.info("initializing bus for topic {}", topic);
76
77         try {
78             this.topic = topic;
79
80             this.topicSource = findTopicSource();
81             this.topicSink = findTopicSink();
82
83         } catch (IllegalArgumentException e) {
84             logger.error("failed to attach to topic {}", topic);
85             throw new PoolingFeatureException(e);
86         }
87     }
88
89     /**
90      * Finds the topic source associated with the internal DMaaP topic.
91      *
92      * @return the topic source
93      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
94      */
95     private TopicSource findTopicSource() throws PoolingFeatureException {
96         for (TopicSource src : getTopicSources()) {
97             if (topic.equals(src.getTopic())) {
98                 return src;
99             }
100         }
101
102         throw new PoolingFeatureException("missing topic source " + topic);
103     }
104
105     /**
106      * Finds the topic sink associated with the internal DMaaP topic.
107      *
108      * @return the topic sink
109      * @throws PoolingFeatureException if the sink doesn't exist
110      */
111     private TopicSink findTopicSink() throws PoolingFeatureException {
112         for (TopicSink sink : getTopicSinks()) {
113             if (topic.equals(sink.getTopic())) {
114                 return sink;
115             }
116         }
117
118         throw new PoolingFeatureException("missing topic sink " + topic);
119     }
120
121     /**
122      * Starts the publisher, if it isn't already running.
123      */
124     public void startPublisher() {
125         if (publishing) {
126             return;
127         }
128
129         logger.info("start publishing to topic {}", topic);
130         publishing = true;
131     }
132
133     /**
134      * Stops the publisher.
135      *
136      * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
137      *        close
138      */
139     public void stopPublisher(long waitMs) {
140         if (!publishing) {
141             return;
142         }
143
144         /*
145          * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
146          * could be passed to sink.stop(), but that isn't an option at this time.
147          */
148         try {
149             Thread.sleep(waitMs);
150
151         } catch (InterruptedException e) {
152             logger.warn("message transmission stopped due to {}", e.getMessage());
153             Thread.currentThread().interrupt();
154         }
155
156         logger.info("stop publishing to topic {}", topic);
157         publishing = false;
158     }
159
160     /**
161      * Starts the consumer, if it isn't already running.
162      *
163      * @param listener listener to register with the source
164      */
165     public void startConsumer(TopicListener listener) {
166         if (consuming) {
167             return;
168         }
169
170         logger.info("start consuming from topic {}", topic);
171         topicSource.register(listener);
172         consuming = true;
173     }
174
175     /**
176      * Stops the consumer.
177      *
178      * @param listener listener to unregister with the source
179      */
180     public void stopConsumer(TopicListener listener) {
181         if (!consuming) {
182             return;
183         }
184
185         logger.info("stop consuming from topic {}", topic);
186         consuming = false;
187         topicSource.unregister(listener);
188     }
189
190     /**
191      * Publishes a message to the sink.
192      *
193      * @param msg message to be published
194      * @throws PoolingFeatureException if an error occurs or the publisher isn't running
195      */
196     public void publish(String msg) throws PoolingFeatureException {
197         if (!publishing) {
198             throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
199         }
200
201         try {
202             if (!topicSink.send(msg)) {
203                 throw new PoolingFeatureException("failed to send to topic sink " + topic);
204             }
205
206         } catch (IllegalStateException e) {
207             throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
208         }
209     }
210
211     /*
212      * The remaining methods may be overridden by junit tests.
213      */
214
215     /**
216      * Get topic source.
217      *
218      * @return the topic sources
219      */
220     protected List<TopicSource> getTopicSources() {
221         return TopicEndpointManager.getManager().getTopicSources();
222     }
223
224     /**
225      * Get topic sinks.
226      *
227      * @return the topic sinks
228      */
229     protected List<TopicSink> getTopicSinks() {
230         return TopicEndpointManager.getManager().getTopicSinks();
231     }
232 }