Drools support for kafka topics
[policy/drools-pdp.git] / feature-pooling-messages / src / main / java / org / onap / policy / drools / pooling / TopicMessageManager.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.pooling;
23
24 import java.util.List;
25 import lombok.Getter;
26 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
27 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
28 import org.onap.policy.common.endpoints.event.comm.TopicListener;
29 import org.onap.policy.common.endpoints.event.comm.TopicSink;
30 import org.onap.policy.common.endpoints.event.comm.TopicSource;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * Manages the internal topic. Assumes all topics are managed by
36  * {@link TopicEndpoint}.
37  */
38 public class TopicMessageManager {
39
40     private static final Logger logger = LoggerFactory.getLogger(TopicMessageManager.class);
41
42     /**
43      * Name of the topic.
44      */
45     @Getter
46     private final String topic;
47
48     /**
49      * Topic source whose filter is to be manipulated.
50      */
51     private final TopicSource topicSource;
52
53     /**
54      * Where to publish messages.
55      */
56     private final TopicSink topicSink;
57
58     /**
59      * {@code True} if the consumer is running, {@code false} otherwise.
60      */
61     private boolean consuming = false;
62
63     /**
64      * {@code True} if the publisher is running, {@code false} otherwise.
65      */
66     private boolean publishing = false;
67
68     /**
69      * Constructs the manager, but does not start the source or sink.
70      *
71      * @param topic name of the internal topic
72      * @throws PoolingFeatureException if an error occurs
73      */
74     public TopicMessageManager(String topic) throws PoolingFeatureException {
75
76         logger.info("initializing bus for topic {}", topic);
77
78         try {
79             this.topic = topic;
80
81             this.topicSource = findTopicSource();
82             this.topicSink = findTopicSink();
83
84         } catch (IllegalArgumentException e) {
85             logger.error("failed to attach to topic {}", topic);
86             throw new PoolingFeatureException(e);
87         }
88     }
89
90     /**
91      * Finds the topic source associated with the internal topic.
92      *
93      * @return the topic source
94      * @throws PoolingFeatureException if the source doesn't exist or is not filterable
95      */
96     private TopicSource findTopicSource() throws PoolingFeatureException {
97         for (TopicSource src : getTopicSources()) {
98             if (topic.equals(src.getTopic())) {
99                 return src;
100             }
101         }
102
103         throw new PoolingFeatureException("missing topic source " + topic);
104     }
105
106     /**
107      * Finds the topic sink associated with the internal topic.
108      *
109      * @return the topic sink
110      * @throws PoolingFeatureException if the sink doesn't exist
111      */
112     private TopicSink findTopicSink() throws PoolingFeatureException {
113         for (TopicSink sink : getTopicSinks()) {
114             if (topic.equals(sink.getTopic())) {
115                 return sink;
116             }
117         }
118
119         throw new PoolingFeatureException("missing topic sink " + topic);
120     }
121
122     /**
123      * Starts the publisher, if it isn't already running.
124      */
125     public void startPublisher() {
126         if (publishing) {
127             return;
128         }
129
130         logger.info("start publishing to topic {}", topic);
131         publishing = true;
132     }
133
134     /**
135      * Stops the publisher.
136      *
137      * @param waitMs time, in milliseconds, to wait for the sink to transmit any queued messages and
138      *        close
139      */
140     public void stopPublisher(long waitMs) {
141         if (!publishing) {
142             return;
143         }
144
145         /*
146          * Give the sink a chance to transmit messages in the queue. It would be better if "waitMs"
147          * could be passed to sink.stop(), but that isn't an option at this time.
148          */
149         try {
150             Thread.sleep(waitMs);
151
152         } catch (InterruptedException e) {
153             logger.warn("message transmission stopped due to {}", e.getMessage());
154             Thread.currentThread().interrupt();
155         }
156
157         logger.info("stop publishing to topic {}", topic);
158         publishing = false;
159     }
160
161     /**
162      * Starts the consumer, if it isn't already running.
163      *
164      * @param listener listener to register with the source
165      */
166     public void startConsumer(TopicListener listener) {
167         if (consuming) {
168             return;
169         }
170
171         logger.info("start consuming from topic {}", topic);
172         topicSource.register(listener);
173         consuming = true;
174     }
175
176     /**
177      * Stops the consumer.
178      *
179      * @param listener listener to unregister with the source
180      */
181     public void stopConsumer(TopicListener listener) {
182         if (!consuming) {
183             return;
184         }
185
186         logger.info("stop consuming from topic {}", topic);
187         consuming = false;
188         topicSource.unregister(listener);
189     }
190
191     /**
192      * Publishes a message to the sink.
193      *
194      * @param msg message to be published
195      * @throws PoolingFeatureException if an error occurs or the publisher isn't running
196      */
197     public void publish(String msg) throws PoolingFeatureException {
198         if (!publishing) {
199             throw new PoolingFeatureException(new IllegalStateException("no topic sink " + topic));
200         }
201
202         try {
203             if (!topicSink.send(msg)) {
204                 throw new PoolingFeatureException("failed to send to topic sink " + topic);
205             }
206
207         } catch (IllegalStateException e) {
208             throw new PoolingFeatureException("cannot send to topic sink " + topic, e);
209         }
210     }
211
212     /*
213      * The remaining methods may be overridden by junit tests.
214      */
215
216     /**
217      * Get topic source.
218      *
219      * @return the topic sources
220      */
221     protected List<TopicSource> getTopicSources() {
222         return TopicEndpointManager.getManager().getTopicSources();
223     }
224
225     /**
226      * Get topic sinks.
227      *
228      * @return the topic sinks
229      */
230     protected List<TopicSink> getTopicSinks() {
231         return TopicEndpointManager.getManager().getTopicSinks();
232     }
233 }