2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.common.endpoints.event.comm;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Properties;
36 import org.junit.jupiter.api.AfterEach;
37 import org.junit.jupiter.api.Test;
38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
39 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
40 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicPropertyBuilder;
41 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
42 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder;
43 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
44 import org.onap.policy.common.endpoints.parameters.TopicParameters;
45 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
48 class TopicEndpointProxyTest {
50 private static final String NOOP_SOURCE_TOPIC = "noop-source";
51 private static final String NOOP_SINK_TOPIC = "noop-sink";
53 private static final String KAFKA_SOURCE_TOPIC = "kafka-source";
54 private static final String KAFKA_SINK_TOPIC = "kafka-sink";
56 private final Properties configuration = new Properties();
57 private final TopicParameterGroup group = new TopicParameterGroup();
62 public TopicEndpointProxyTest() {
63 group.setTopicSinks(new LinkedList<>());
64 group.setTopicSources(new LinkedList<>());
66 NoopTopicPropertyBuilder noopSourceBuilder =
67 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
68 .makeTopic(NOOP_SOURCE_TOPIC);
69 configuration.putAll(noopSourceBuilder.build());
70 group.getTopicSources().add(noopSourceBuilder.getParams());
72 NoopTopicPropertyBuilder noopSinkBuilder =
73 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS)
74 .makeTopic(NOOP_SINK_TOPIC);
75 configuration.putAll(noopSinkBuilder.build());
76 group.getTopicSinks().add(noopSinkBuilder.getParams());
78 TopicParameters invalidCommInfraParams =
79 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
80 .makeTopic(NOOP_SOURCE_TOPIC).getParams();
81 invalidCommInfraParams.setTopicCommInfrastructure(Topic.CommInfrastructure.REST.name());
82 group.getTopicSources().add(invalidCommInfraParams);
83 group.getTopicSinks().add(invalidCommInfraParams);
86 private <T extends Topic> boolean exists(List<T> topics, String topicName) {
87 return topics.stream().map(Topic::getTopic).anyMatch(topicName::equals);
90 private <T extends Topic> boolean allSources(List<T> topics) {
91 return exists(topics, NOOP_SOURCE_TOPIC);
94 private <T extends Topic> boolean allSinks(List<T> topics) {
95 return exists(topics, NOOP_SINK_TOPIC);
98 private <T extends Topic> boolean anySource(List<T> topics) {
99 return exists(topics, NOOP_SOURCE_TOPIC);
102 private <T extends Topic> boolean anySink(List<T> topics) {
103 return exists(topics, NOOP_SINK_TOPIC);
107 * Destroys all managed topics.
110 public void tearDown() {
111 NoopTopicFactories.getSinkFactory().destroy();
112 NoopTopicFactories.getSourceFactory().destroy();
113 KafkaTopicFactories.getSinkFactory().destroy();
114 KafkaTopicFactories.getSourceFactory().destroy();
118 void testSerialize() {
119 TopicEndpoint manager = new TopicEndpointProxy();
121 manager.addTopicSources(configuration);
122 manager.addTopicSinks(configuration);
124 assertThatCode(() -> new GsonTestUtils().compareGson(manager, TopicEndpointProxyTest.class))
125 .doesNotThrowAnyException();
129 void testAddTopicSourcesListOfTopicParameters() {
130 TopicEndpoint manager = new TopicEndpointProxy();
132 List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
133 assertSame(1, sources.size());
135 assertTrue(allSources(sources));
136 assertFalse(anySink(sources));
138 sources = manager.addTopicSources(group.getTopicSources());
139 assertSame(1, sources.size());
140 assertTrue(allSources(sources));
144 void testAddTopicSourcesKafka() {
145 TopicEndpoint manager = new TopicEndpointProxy();
147 KafkaTopicPropertyBuilder kafkaTopicPropertyBuilder =
148 new KafkaTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS)
149 .makeTopic(KAFKA_SOURCE_TOPIC);
151 configuration.putAll(kafkaTopicPropertyBuilder.build());
152 group.getTopicSources().add(kafkaTopicPropertyBuilder.getParams());
153 List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
154 assertSame(2, sources.size());
156 configuration.remove(KAFKA_SOURCE_TOPIC);
157 group.setTopicSources(new LinkedList<>());
158 sources = manager.addTopicSources(group.getTopicSources());
159 assertSame(0, sources.size());
163 void testAddTopicSourcesProperties() {
164 TopicEndpoint manager = new TopicEndpointProxy();
166 List<TopicSource> sources = manager.addTopicSources(configuration);
167 assertSame(1, sources.size());
169 assertTrue(allSources(sources));
170 assertFalse(anySink(sources));
174 void testAddTopicSinksListOfTopicParameters() {
175 TopicEndpoint manager = new TopicEndpointProxy();
177 List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
178 assertSame(1, sinks.size());
180 assertFalse(anySource(sinks));
181 assertTrue(allSinks(sinks));
185 void testAddTopicSinksListOfTopicParametersKafka() {
186 TopicEndpoint manager = new TopicEndpointProxy();
188 List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
189 assertSame(1, sinks.size());
191 KafkaTopicPropertyBuilder kafkaTopicPropertyBuilder =
192 new KafkaTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS)
193 .makeTopic(KAFKA_SINK_TOPIC);
195 configuration.putAll(kafkaTopicPropertyBuilder.build());
196 group.getTopicSources().add(kafkaTopicPropertyBuilder.getParams());
197 sinks = manager.addTopicSinks(group.getTopicSources());
198 assertSame(2, sinks.size());
200 configuration.remove(KAFKA_SOURCE_TOPIC);
201 group.setTopicSources(new LinkedList<>());
202 sinks = manager.addTopicSinks(group.getTopicSources());
203 assertSame(0, sinks.size());
207 void testAddTopicSinksProperties() {
208 TopicEndpoint manager = new TopicEndpointProxy();
210 List<TopicSink> sinks = manager.addTopicSinks(configuration);
211 assertSame(1, sinks.size());
213 assertFalse(anySource(sinks));
214 assertTrue(allSinks(sinks));
218 void testAddTopicsProperties() {
219 TopicEndpoint manager = new TopicEndpointProxy();
221 List<Topic> topics = manager.addTopics(configuration);
222 assertSame(2, topics.size());
224 assertTrue(allSources(topics));
225 assertTrue(allSinks(topics));
229 void testAddTopicsTopicParameterGroup() {
230 TopicEndpoint manager = new TopicEndpointProxy();
232 List<Topic> topics = manager.addTopics(group);
233 assertSame(2, topics.size());
235 assertTrue(allSources(topics));
236 assertTrue(allSinks(topics));
240 void testAddTopicsTopicParameterGroupNull() {
241 TopicEndpoint manager = new TopicEndpointProxy();
243 List<Topic> topics = manager.addTopics(new TopicParameterGroup());
244 assertEquals(0, topics.size());
248 void testLockSinks_lockSources_locked() {
249 TopicEndpoint manager = new TopicEndpointProxy();
251 for (Topic topic : manager.addTopics(group)) {
252 assertTrue(topic.isLocked());
257 void testLockSinks_lockSources_unlocked() {
258 TopicEndpoint manager = new TopicEndpointProxy();
259 for (Topic topic : manager.addTopics(group)) {
260 assertFalse(topic.isLocked());
265 void testGetTopicSources() {
266 TopicEndpoint manager = new TopicEndpointProxy();
268 manager.addTopicSources(configuration);
269 manager.addTopicSinks(configuration);
271 List<TopicSource> sources = manager.getTopicSources();
272 assertSame(1, sources.size());
274 assertTrue(allSources(sources));
275 assertFalse(anySink(sources));
277 assertThatThrownBy(() -> manager.getKafkaTopicSource("testTopic"))
278 .hasMessageContaining("KafkaTopiceSource for testTopic not found");
280 List<String> topicName = null;
281 assertThatThrownBy(() -> manager.getTopicSources(topicName))
282 .hasMessageContaining("must provide a list of topics");
286 void testGetTopicSinks() {
287 TopicEndpoint manager = new TopicEndpointProxy();
289 manager.addTopicSources(configuration);
290 manager.addTopicSinks(configuration);
292 List<TopicSink> sinks = manager.getTopicSinks();
293 assertSame(1, sinks.size());
295 assertFalse(anySource(sinks));
296 assertTrue(allSinks(sinks));
298 final List<String> sinks2 = null;
299 assertThatThrownBy(() -> manager.getTopicSinks(sinks2)).hasMessageContaining("must provide a list of topics");
301 List<String> sinks3 = List.of(NOOP_SINK_TOPIC);
302 assertThatCode(() -> manager.getTopicSinks(sinks3)).doesNotThrowAnyException();
304 String sinkTest = null;
305 assertThatThrownBy(() -> manager.getTopicSinks(sinkTest))
306 .isInstanceOf(IllegalArgumentException.class)
307 .hasMessageContaining("Invalid parameter");
309 assertThatThrownBy(() -> manager.getKafkaTopicSink("testTopic"))
310 .hasMessageContaining("KafkaTopicSink for testTopic not found");
314 void testGetNoopTopicSources() {
315 TopicEndpoint manager = new TopicEndpointProxy();
317 manager.addTopicSources(configuration);
318 assertSame(1, manager.getNoopTopicSources().size());
322 void testGetNoopTopicSinks() {
323 TopicEndpoint manager = new TopicEndpointProxy();
325 manager.addTopicSinks(configuration);
326 assertSame(1, manager.getNoopTopicSinks().size());
330 void testLifecycle() {
331 TopicEndpoint manager = new TopicEndpointProxy();
333 assertTrue(manager.start());
334 assertTrue(manager.isAlive());
336 assertTrue(manager.stop());
337 assertFalse(manager.isAlive());
339 assertTrue(manager.start());
340 assertTrue(manager.isAlive());
343 assertFalse(manager.isAlive());
348 TopicEndpoint manager = new TopicEndpointProxy();
351 assertTrue(manager.isLocked());
354 assertFalse(manager.isLocked());
358 void testGetTopicSource() {
359 TopicEndpoint manager = new TopicEndpointProxy();
360 manager.addTopicSources(configuration);
362 assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic());
364 assertThatIllegalStateException()
365 .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC));
369 void testGetTopicSink() {
370 TopicEndpoint manager = new TopicEndpointProxy();
371 manager.addTopicSinks(configuration);
373 assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic());
375 assertThatIllegalStateException()
376 .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC));
380 void testGetNoopTopicSource() {
381 TopicEndpoint manager = new TopicEndpointProxy();
382 manager.addTopicSources(configuration);
384 assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic());
386 assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null));
387 assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(""));
391 void testGetNoopTopicSink() {
392 TopicEndpoint manager = new TopicEndpointProxy();
393 manager.addTopicSinks(configuration);
395 assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic());
397 assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null));
398 assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(""));