cb6eb107742d9df908017a4c5760fc7c37bb3def
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Matchers.anyString;
28 import static org.mockito.Mockito.doThrow;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.util.Arrays;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
38 import org.onap.policy.common.endpoints.event.comm.TopicListener;
39 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
40 import org.onap.policy.common.utils.gson.GsonTestUtils;
41
42 public class InlineBusTopicSinkTest extends TopicTestBase {
43
44     private InlineBusTopicSinkImpl sink;
45
46     /**
47      * Creates the object to be tested.
48      */
49     @Before
50     public void setUp() {
51         super.setUp();
52
53         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
54     }
55
56     @After
57     public void tearDown() {
58         sink.shutdown();
59     }
60
61     @Test
62     public void testSerialize() {
63         new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class);
64     }
65
66     @Test
67     public void testInlineBusTopicSinkImpl() {
68         // verify that different wrappers can be built
69         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
70         assertEquals(MY_PARTITION, sink.getPartitionKey());
71
72         sink = new InlineBusTopicSinkImpl(makeBuilder().partitionId(null).build());
73         assertNotNull(sink.getPartitionKey());
74     }
75
76     @Test
77     public void testStart() {
78         assertTrue(sink.start());
79         assertEquals(1, sink.initCount);
80
81         // re-start, init() should not be invoked again
82         assertTrue(sink.start());
83         assertEquals(1, sink.initCount);
84     }
85
86     @Test(expected = IllegalStateException.class)
87     public void testStart_Locked() {
88         sink.lock();
89         sink.start();
90     }
91
92     @Test
93     public void testStop() {
94         BusPublisher pub = mock(BusPublisher.class);
95         sink.publisher = pub;
96
97         assertTrue(sink.stop());
98         verify(pub).close();
99
100         // stop again, shouldn't not invoke close() again
101         assertFalse(sink.stop());
102         verify(pub).close();
103
104         // publisher throws exception
105         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
106         sink.publisher = pub;
107         doThrow(new RuntimeException(EXPECTED)).when(pub).close();
108         assertTrue(sink.stop());
109     }
110
111     @Test
112     public void testSend() {
113         sink.start();
114         BusPublisher pub = mock(BusPublisher.class);
115         sink.publisher = pub;
116
117         TopicListener listener = mock(TopicListener.class);
118         sink.register(listener);
119
120         assertTrue(sink.send(MY_MESSAGE));
121
122         verify(pub).send(MY_PARTITION, MY_MESSAGE);
123         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
124         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
125
126         // arrange for send to throw an exception
127         when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
128
129         assertFalse(sink.send(MY_MESSAGE));
130
131         // no more event deliveries
132         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
133     }
134
135     @Test(expected = IllegalArgumentException.class)
136     public void testSend_NullMessage() {
137         sink.start();
138         BusPublisher pub = mock(BusPublisher.class);
139         sink.publisher = pub;
140
141         sink.send(null);
142     }
143
144     @Test(expected = IllegalArgumentException.class)
145     public void testSend_EmptyMessage() {
146         sink.start();
147         BusPublisher pub = mock(BusPublisher.class);
148         sink.publisher = pub;
149
150         sink.send("");
151     }
152
153     @Test(expected = IllegalStateException.class)
154     public void testSend_NotStarted() {
155         BusPublisher pub = mock(BusPublisher.class);
156         sink.publisher = pub;
157
158         sink.send(MY_MESSAGE);
159     }
160
161     @Test
162     public void testSetPartitionKey_getPartitionKey() {
163         assertEquals(MY_PARTITION, sink.getPartitionKey());
164
165         sink.setPartitionKey("part-B");
166         assertEquals("part-B", sink.getPartitionKey());
167     }
168
169     @Test
170     public void testShutdown() {
171         BusPublisher pub = mock(BusPublisher.class);
172         sink.publisher = pub;
173
174         sink.shutdown();
175         verify(pub).close();
176     }
177
178     @Test
179     public void testAnyNullOrEmpty() {
180         assertFalse(sink.anyNullOrEmpty());
181         assertFalse(sink.anyNullOrEmpty("any-none-null", "any-none-null-B"));
182
183         assertTrue(sink.anyNullOrEmpty(null, "any-first-null"));
184         assertTrue(sink.anyNullOrEmpty("any-middle-null", null, "any-middle-null-B"));
185         assertTrue(sink.anyNullOrEmpty("any-last-null", null));
186         assertTrue(sink.anyNullOrEmpty("any-empty", ""));
187     }
188
189     @Test
190     public void testAllNullOrEmpty() {
191         assertTrue(sink.allNullOrEmpty());
192         assertTrue(sink.allNullOrEmpty(""));
193         assertTrue(sink.allNullOrEmpty(null, ""));
194
195         assertFalse(sink.allNullOrEmpty("all-ok-only-one"));
196         assertFalse(sink.allNullOrEmpty("all-ok-one", "all-ok-two"));
197         assertFalse(sink.allNullOrEmpty("all-ok-null", null));
198         assertFalse(sink.allNullOrEmpty("", "all-ok-empty"));
199         assertFalse(sink.allNullOrEmpty("", "all-one-ok", null));
200     }
201
202     @Test
203     public void testToString() {
204         assertTrue(sink.toString().startsWith("InlineBusTopicSink ["));
205     }
206
207     /**
208      * Implementation of InlineBusTopicSink that tracks the number of times that init() is
209      * invoked.
210      */
211     private static class InlineBusTopicSinkImpl extends InlineBusTopicSink {
212
213         private int initCount = 0;
214
215         public InlineBusTopicSinkImpl(BusTopicParams busTopicParams) {
216             super(busTopicParams);
217         }
218
219         @Override
220         public CommInfrastructure getTopicCommInfrastructure() {
221             return CommInfrastructure.NOOP;
222         }
223
224         @Override
225         public void init() {
226             ++initCount;
227         }
228
229     }
230 }