7aa70b2ab29feb4e2ddbc14a8a083418a902a42e
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2020 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertTrue;
29 import static org.mockito.ArgumentMatchers.anyString;
30 import static org.mockito.Mockito.doThrow;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import java.util.Arrays;
36 import java.util.List;
37 import org.junit.After;
38 import org.junit.Before;
39 import org.junit.Test;
40 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
41 import org.onap.policy.common.endpoints.event.comm.TopicListener;
42 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
43 import org.onap.policy.common.utils.gson.GsonTestUtils;
44
45 public class InlineBusTopicSinkTest extends TopicTestBase {
46
47     private InlineBusTopicSinkImpl sink;
48
49     /**
50      * Creates the object to be tested.
51      */
52     @Before
53     @Override
54     public void setUp() {
55         super.setUp();
56
57         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
58     }
59
60     @After
61     public void tearDown() {
62         sink.shutdown();
63     }
64
65     @Test
66     public void testSerialize() {
67         assertThatCode(() -> new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class))
68                         .doesNotThrowAnyException();
69     }
70
71     @Test
72     public void testInlineBusTopicSinkImpl() {
73         // verify that different wrappers can be built
74         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
75         assertEquals(MY_PARTITION, sink.getPartitionKey());
76
77         sink = new InlineBusTopicSinkImpl(makeBuilder().partitionId(null).build());
78         assertNotNull(sink.getPartitionKey());
79     }
80
81     @Test
82     public void testStart() {
83         assertTrue(sink.start());
84         assertEquals(1, sink.initCount);
85
86         // re-start, init() should not be invoked again
87         assertTrue(sink.start());
88         assertEquals(1, sink.initCount);
89     }
90
91     @Test(expected = IllegalStateException.class)
92     public void testStart_Locked() {
93         sink.lock();
94         sink.start();
95     }
96
97     @Test
98     public void testStop() {
99         BusPublisher pub = mock(BusPublisher.class);
100         sink.publisher = pub;
101
102         assertTrue(sink.stop());
103         verify(pub).close();
104
105         // stop again, shouldn't not invoke close() again
106         assertFalse(sink.stop());
107         verify(pub).close();
108
109         // publisher throws exception
110         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
111         sink.publisher = pub;
112         doThrow(new RuntimeException(EXPECTED)).when(pub).close();
113         assertTrue(sink.stop());
114     }
115
116     @Test
117     public void testSend() {
118         sink.start();
119         BusPublisher pub = mock(BusPublisher.class);
120         sink.publisher = pub;
121
122         TopicListener listener = mock(TopicListener.class);
123         sink.register(listener);
124
125         assertTrue(sink.send(MY_MESSAGE));
126
127         verify(pub).send(MY_PARTITION, MY_MESSAGE);
128         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
129         assertEquals(List.of(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
130
131         // arrange for send to throw an exception
132         when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
133
134         assertFalse(sink.send(MY_MESSAGE));
135
136         // no more event deliveries
137         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
138     }
139
140     @Test(expected = IllegalArgumentException.class)
141     public void testSend_NullMessage() {
142         sink.start();
143         sink.publisher = mock(BusPublisher.class);
144
145         sink.send(null);
146     }
147
148     @Test(expected = IllegalArgumentException.class)
149     public void testSend_EmptyMessage() {
150         sink.start();
151         sink.publisher = mock(BusPublisher.class);
152
153         sink.send("");
154     }
155
156     @Test(expected = IllegalStateException.class)
157     public void testSend_NotStarted() {
158         sink.publisher = mock(BusPublisher.class);
159
160         sink.send(MY_MESSAGE);
161     }
162
163     @Test
164     public void testSetPartitionKey_getPartitionKey() {
165         assertEquals(MY_PARTITION, sink.getPartitionKey());
166
167         sink.setPartitionKey("part-B");
168         assertEquals("part-B", sink.getPartitionKey());
169     }
170
171     @Test
172     public void testShutdown() {
173         BusPublisher pub = mock(BusPublisher.class);
174         sink.publisher = pub;
175
176         sink.shutdown();
177         verify(pub).close();
178     }
179
180     @Test
181     public void testAnyNullOrEmpty() {
182         assertFalse(sink.anyNullOrEmpty());
183         assertFalse(sink.anyNullOrEmpty("any-none-null", "any-none-null-B"));
184
185         assertTrue(sink.anyNullOrEmpty(null, "any-first-null"));
186         assertTrue(sink.anyNullOrEmpty("any-middle-null", null, "any-middle-null-B"));
187         assertTrue(sink.anyNullOrEmpty("any-last-null", null));
188         assertTrue(sink.anyNullOrEmpty("any-empty", ""));
189     }
190
191     @Test
192     public void testAllNullOrEmpty() {
193         assertTrue(sink.allNullOrEmpty());
194         assertTrue(sink.allNullOrEmpty(""));
195         assertTrue(sink.allNullOrEmpty(null, ""));
196
197         assertFalse(sink.allNullOrEmpty("all-ok-only-one"));
198         assertFalse(sink.allNullOrEmpty("all-ok-one", "all-ok-two"));
199         assertFalse(sink.allNullOrEmpty("all-ok-null", null));
200         assertFalse(sink.allNullOrEmpty("", "all-ok-empty"));
201         assertFalse(sink.allNullOrEmpty("", "all-one-ok", null));
202     }
203
204     @Test
205     public void testToString() {
206         assertTrue(sink.toString().startsWith("InlineBusTopicSink ["));
207     }
208
209     /**
210      * Implementation of InlineBusTopicSink that tracks the number of times that init() is
211      * invoked.
212      */
213     private static class InlineBusTopicSinkImpl extends InlineBusTopicSink {
214
215         private int initCount = 0;
216
217         public InlineBusTopicSinkImpl(BusTopicParams busTopicParams) {
218             super(busTopicParams);
219         }
220
221         @Override
222         public CommInfrastructure getTopicCommInfrastructure() {
223             return CommInfrastructure.NOOP;
224         }
225
226         @Override
227         public void init() {
228             ++initCount;
229         }
230
231     }
232 }