a30904dd7a40635f82061243b5818bae6d6c38ee
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertSame;
31 import static org.junit.jupiter.api.Assertions.assertTrue;
32
33 import java.util.LinkedList;
34 import java.util.List;
35 import java.util.Properties;
36 import org.junit.jupiter.api.AfterEach;
37 import org.junit.jupiter.api.Test;
38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
39 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicFactories;
40 import org.onap.policy.common.endpoints.event.comm.bus.KafkaTopicPropertyBuilder;
41 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicFactories;
42 import org.onap.policy.common.endpoints.event.comm.bus.NoopTopicPropertyBuilder;
43 import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
44 import org.onap.policy.common.endpoints.parameters.TopicParameters;
45 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
46 import org.onap.policy.common.utils.gson.GsonTestUtils;
47
48 class TopicEndpointProxyTest {
49
50     private static final String NOOP_SOURCE_TOPIC = "noop-source";
51     private static final String NOOP_SINK_TOPIC = "noop-sink";
52
53     private static final String KAFKA_SOURCE_TOPIC = "kafka-source";
54     private static final String KAFKA_SINK_TOPIC = "kafka-sink";
55
56     private final Properties configuration = new Properties();
57     private final TopicParameterGroup group = new TopicParameterGroup();
58
59     /**
60      * Constructor.
61      */
62     public TopicEndpointProxyTest() {
63         group.setTopicSinks(new LinkedList<>());
64         group.setTopicSources(new LinkedList<>());
65
66         NoopTopicPropertyBuilder noopSourceBuilder =
67                 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
68                         .makeTopic(NOOP_SOURCE_TOPIC);
69         configuration.putAll(noopSourceBuilder.build());
70         group.getTopicSources().add(noopSourceBuilder.getParams());
71
72         NoopTopicPropertyBuilder noopSinkBuilder =
73                 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS)
74                         .makeTopic(NOOP_SINK_TOPIC);
75         configuration.putAll(noopSinkBuilder.build());
76         group.getTopicSinks().add(noopSinkBuilder.getParams());
77
78         TopicParameters invalidCommInfraParams =
79                 new NoopTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS)
80                         .makeTopic(NOOP_SOURCE_TOPIC).getParams();
81         invalidCommInfraParams.setTopicCommInfrastructure(Topic.CommInfrastructure.REST.name());
82         group.getTopicSources().add(invalidCommInfraParams);
83         group.getTopicSinks().add(invalidCommInfraParams);
84     }
85
86     private <T extends Topic> boolean exists(List<T> topics, String topicName) {
87         return topics.stream().map(Topic::getTopic).anyMatch(topicName::equals);
88     }
89
90     private <T extends Topic> boolean allSources(List<T> topics) {
91         return exists(topics, NOOP_SOURCE_TOPIC);
92     }
93
94     private <T extends Topic> boolean allSinks(List<T> topics) {
95         return exists(topics, NOOP_SINK_TOPIC);
96     }
97
98     private <T extends Topic> boolean anySource(List<T> topics) {
99         return exists(topics, NOOP_SOURCE_TOPIC);
100     }
101
102     private <T extends Topic> boolean anySink(List<T> topics) {
103         return exists(topics, NOOP_SINK_TOPIC);
104     }
105
106     /**
107      * Destroys all managed topics.
108      */
109     @AfterEach
110     public void tearDown() {
111         NoopTopicFactories.getSinkFactory().destroy();
112         NoopTopicFactories.getSourceFactory().destroy();
113         KafkaTopicFactories.getSinkFactory().destroy();
114         KafkaTopicFactories.getSourceFactory().destroy();
115     }
116
117     @Test
118     void testSerialize() {
119         TopicEndpoint manager = new TopicEndpointProxy();
120
121         manager.addTopicSources(configuration);
122         manager.addTopicSinks(configuration);
123
124         assertThatCode(() -> new GsonTestUtils().compareGson(manager, TopicEndpointProxyTest.class))
125                 .doesNotThrowAnyException();
126     }
127
128     @Test
129     void testAddTopicSourcesListOfTopicParameters() {
130         TopicEndpoint manager = new TopicEndpointProxy();
131
132         List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
133         assertSame(1, sources.size());
134
135         assertTrue(allSources(sources));
136         assertFalse(anySink(sources));
137
138         sources = manager.addTopicSources(group.getTopicSources());
139         assertSame(1, sources.size());
140         assertTrue(allSources(sources));
141     }
142
143     @Test
144     void testAddTopicSourcesKafka() {
145         TopicEndpoint manager = new TopicEndpointProxy();
146
147         KafkaTopicPropertyBuilder kafkaTopicPropertyBuilder =
148             new KafkaTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_KAFKA_SOURCE_TOPICS)
149                 .makeTopic(KAFKA_SOURCE_TOPIC);
150
151         configuration.putAll(kafkaTopicPropertyBuilder.build());
152         group.getTopicSources().add(kafkaTopicPropertyBuilder.getParams());
153         List<TopicSource> sources = manager.addTopicSources(group.getTopicSources());
154         assertSame(2, sources.size());
155
156         configuration.remove(KAFKA_SOURCE_TOPIC);
157         group.setTopicSources(new LinkedList<>());
158         sources = manager.addTopicSources(group.getTopicSources());
159         assertSame(0, sources.size());
160     }
161
162     @Test
163     void testAddTopicSourcesProperties() {
164         TopicEndpoint manager = new TopicEndpointProxy();
165
166         List<TopicSource> sources = manager.addTopicSources(configuration);
167         assertSame(1, sources.size());
168
169         assertTrue(allSources(sources));
170         assertFalse(anySink(sources));
171     }
172
173     @Test
174     void testAddTopicSinksListOfTopicParameters() {
175         TopicEndpoint manager = new TopicEndpointProxy();
176
177         List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
178         assertSame(1, sinks.size());
179
180         assertFalse(anySource(sinks));
181         assertTrue(allSinks(sinks));
182     }
183
184     @Test
185     void testAddTopicSinksListOfTopicParametersKafka() {
186         TopicEndpoint manager = new TopicEndpointProxy();
187
188         List<TopicSink> sinks = manager.addTopicSinks(group.getTopicSinks());
189         assertSame(1, sinks.size());
190
191         KafkaTopicPropertyBuilder kafkaTopicPropertyBuilder =
192             new KafkaTopicPropertyBuilder(PolicyEndPointProperties.PROPERTY_KAFKA_SINK_TOPICS)
193                 .makeTopic(KAFKA_SINK_TOPIC);
194
195         configuration.putAll(kafkaTopicPropertyBuilder.build());
196         group.getTopicSources().add(kafkaTopicPropertyBuilder.getParams());
197         sinks = manager.addTopicSinks(group.getTopicSources());
198         assertSame(2, sinks.size());
199
200         configuration.remove(KAFKA_SOURCE_TOPIC);
201         group.setTopicSources(new LinkedList<>());
202         sinks = manager.addTopicSinks(group.getTopicSources());
203         assertSame(0, sinks.size());
204     }
205
206     @Test
207     void testAddTopicSinksProperties() {
208         TopicEndpoint manager = new TopicEndpointProxy();
209
210         List<TopicSink> sinks = manager.addTopicSinks(configuration);
211         assertSame(1, sinks.size());
212
213         assertFalse(anySource(sinks));
214         assertTrue(allSinks(sinks));
215     }
216
217     @Test
218     void testAddTopicsProperties() {
219         TopicEndpoint manager = new TopicEndpointProxy();
220
221         List<Topic> topics = manager.addTopics(configuration);
222         assertSame(2, topics.size());
223
224         assertTrue(allSources(topics));
225         assertTrue(allSinks(topics));
226     }
227
228     @Test
229     void testAddTopicsTopicParameterGroup() {
230         TopicEndpoint manager = new TopicEndpointProxy();
231
232         List<Topic> topics = manager.addTopics(group);
233         assertSame(2, topics.size());
234
235         assertTrue(allSources(topics));
236         assertTrue(allSinks(topics));
237     }
238
239     @Test
240     void testAddTopicsTopicParameterGroupNull() {
241         TopicEndpoint manager = new TopicEndpointProxy();
242
243         List<Topic> topics = manager.addTopics(new TopicParameterGroup());
244         assertEquals(0, topics.size());
245     }
246
247     @Test
248     void testLockSinks_lockSources_locked() {
249         TopicEndpoint manager = new TopicEndpointProxy();
250         manager.lock();
251         for (Topic topic : manager.addTopics(group)) {
252             assertTrue(topic.isLocked());
253         }
254     }
255
256     @Test
257     void testLockSinks_lockSources_unlocked() {
258         TopicEndpoint manager = new TopicEndpointProxy();
259         for (Topic topic : manager.addTopics(group)) {
260             assertFalse(topic.isLocked());
261         }
262     }
263
264     @Test
265     void testGetTopicSources() {
266         TopicEndpoint manager = new TopicEndpointProxy();
267
268         manager.addTopicSources(configuration);
269         manager.addTopicSinks(configuration);
270
271         List<TopicSource> sources = manager.getTopicSources();
272         assertSame(1, sources.size());
273
274         assertTrue(allSources(sources));
275         assertFalse(anySink(sources));
276
277         assertThatThrownBy(() -> manager.getKafkaTopicSource("testTopic"))
278             .hasMessageContaining("KafkaTopiceSource for testTopic not found");
279
280         List<String> topicName = null;
281         assertThatThrownBy(() -> manager.getTopicSources(topicName))
282             .hasMessageContaining("must provide a list of topics");
283     }
284
285     @Test
286     void testGetTopicSinks() {
287         TopicEndpoint manager = new TopicEndpointProxy();
288
289         manager.addTopicSources(configuration);
290         manager.addTopicSinks(configuration);
291
292         List<TopicSink> sinks = manager.getTopicSinks();
293         assertSame(1, sinks.size());
294
295         assertFalse(anySource(sinks));
296         assertTrue(allSinks(sinks));
297
298         final List<String> sinks2 = null;
299         assertThatThrownBy(() -> manager.getTopicSinks(sinks2)).hasMessageContaining("must provide a list of topics");
300
301         List<String> sinks3 = List.of(NOOP_SINK_TOPIC);
302         assertThatCode(() -> manager.getTopicSinks(sinks3)).doesNotThrowAnyException();
303
304         String sinkTest = null;
305         assertThatThrownBy(() -> manager.getTopicSinks(sinkTest))
306             .isInstanceOf(IllegalArgumentException.class)
307             .hasMessageContaining("Invalid parameter");
308
309         assertThatThrownBy(() -> manager.getKafkaTopicSink("testTopic"))
310             .hasMessageContaining("KafkaTopicSink for testTopic not found");
311     }
312
313     @Test
314     void testGetNoopTopicSources() {
315         TopicEndpoint manager = new TopicEndpointProxy();
316
317         manager.addTopicSources(configuration);
318         assertSame(1, manager.getNoopTopicSources().size());
319     }
320
321     @Test
322     void testGetNoopTopicSinks() {
323         TopicEndpoint manager = new TopicEndpointProxy();
324
325         manager.addTopicSinks(configuration);
326         assertSame(1, manager.getNoopTopicSinks().size());
327     }
328
329     @Test
330     void testLifecycle() {
331         TopicEndpoint manager = new TopicEndpointProxy();
332
333         assertTrue(manager.start());
334         assertTrue(manager.isAlive());
335
336         assertTrue(manager.stop());
337         assertFalse(manager.isAlive());
338
339         assertTrue(manager.start());
340         assertTrue(manager.isAlive());
341
342         manager.shutdown();
343         assertFalse(manager.isAlive());
344     }
345
346     @Test
347     void testLock() {
348         TopicEndpoint manager = new TopicEndpointProxy();
349
350         manager.lock();
351         assertTrue(manager.isLocked());
352
353         manager.unlock();
354         assertFalse(manager.isLocked());
355     }
356
357     @Test
358     void testGetTopicSource() {
359         TopicEndpoint manager = new TopicEndpointProxy();
360         manager.addTopicSources(configuration);
361
362         assertSame(NOOP_SOURCE_TOPIC, manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC).getTopic());
363
364         assertThatIllegalStateException()
365                 .isThrownBy(() -> manager.getTopicSource(CommInfrastructure.NOOP, NOOP_SINK_TOPIC));
366     }
367
368     @Test
369     void testGetTopicSink() {
370         TopicEndpoint manager = new TopicEndpointProxy();
371         manager.addTopicSinks(configuration);
372
373         assertSame(NOOP_SINK_TOPIC, manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SINK_TOPIC).getTopic());
374
375         assertThatIllegalStateException()
376                 .isThrownBy(() -> manager.getTopicSink(CommInfrastructure.NOOP, NOOP_SOURCE_TOPIC));
377     }
378
379     @Test
380     void testGetNoopTopicSource() {
381         TopicEndpoint manager = new TopicEndpointProxy();
382         manager.addTopicSources(configuration);
383
384         assertSame(NOOP_SOURCE_TOPIC, manager.getNoopTopicSource(NOOP_SOURCE_TOPIC).getTopic());
385
386         assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(null));
387         assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSource(""));
388     }
389
390     @Test
391     void testGetNoopTopicSink() {
392         TopicEndpoint manager = new TopicEndpointProxy();
393         manager.addTopicSinks(configuration);
394
395         assertSame(NOOP_SINK_TOPIC, manager.getNoopTopicSink(NOOP_SINK_TOPIC).getTopic());
396
397         assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(null));
398         assertThatIllegalArgumentException().isThrownBy(() -> manager.getNoopTopicSink(""));
399     }
400 }