2356746e46026b72022c61c32154e0ff365be44d
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.assertFalse;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Matchers.anyString;
28 import static org.mockito.Mockito.doThrow;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.when;
32
33 import java.util.Arrays;
34 import org.junit.After;
35 import org.junit.Before;
36 import org.junit.Test;
37 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
38 import org.onap.policy.common.endpoints.event.comm.TopicListener;
39 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
40 import org.onap.policy.common.utils.gson.GsonTestUtils;
41
42 public class InlineBusTopicSinkTest extends TopicTestBase {
43
44     private InlineBusTopicSinkImpl sink;
45
46     /**
47      * Creates the object to be tested.
48      */
49     @Before
50     @Override
51     public void setUp() {
52         super.setUp();
53
54         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
55     }
56
57     @After
58     public void tearDown() {
59         sink.shutdown();
60     }
61
62     @Test
63     public void testSerialize() {
64         new GsonTestUtils().compareGson(sink, InlineBusTopicSinkTest.class);
65     }
66
67     @Test
68     public void testInlineBusTopicSinkImpl() {
69         // verify that different wrappers can be built
70         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
71         assertEquals(MY_PARTITION, sink.getPartitionKey());
72
73         sink = new InlineBusTopicSinkImpl(makeBuilder().partitionId(null).build());
74         assertNotNull(sink.getPartitionKey());
75     }
76
77     @Test
78     public void testStart() {
79         assertTrue(sink.start());
80         assertEquals(1, sink.initCount);
81
82         // re-start, init() should not be invoked again
83         assertTrue(sink.start());
84         assertEquals(1, sink.initCount);
85     }
86
87     @Test(expected = IllegalStateException.class)
88     public void testStart_Locked() {
89         sink.lock();
90         sink.start();
91     }
92
93     @Test
94     public void testStop() {
95         BusPublisher pub = mock(BusPublisher.class);
96         sink.publisher = pub;
97
98         assertTrue(sink.stop());
99         verify(pub).close();
100
101         // stop again, shouldn't not invoke close() again
102         assertFalse(sink.stop());
103         verify(pub).close();
104
105         // publisher throws exception
106         sink = new InlineBusTopicSinkImpl(makeBuilder().build());
107         sink.publisher = pub;
108         doThrow(new RuntimeException(EXPECTED)).when(pub).close();
109         assertTrue(sink.stop());
110     }
111
112     @Test
113     public void testSend() {
114         sink.start();
115         BusPublisher pub = mock(BusPublisher.class);
116         sink.publisher = pub;
117
118         TopicListener listener = mock(TopicListener.class);
119         sink.register(listener);
120
121         assertTrue(sink.send(MY_MESSAGE));
122
123         verify(pub).send(MY_PARTITION, MY_MESSAGE);
124         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
125         assertEquals(Arrays.asList(MY_MESSAGE), Arrays.asList(sink.getRecentEvents()));
126
127         // arrange for send to throw an exception
128         when(pub.send(anyString(), anyString())).thenThrow(new RuntimeException(EXPECTED));
129
130         assertFalse(sink.send(MY_MESSAGE));
131
132         // no more event deliveries
133         verify(listener).onTopicEvent(CommInfrastructure.NOOP, MY_TOPIC, MY_MESSAGE);
134     }
135
136     @Test(expected = IllegalArgumentException.class)
137     public void testSend_NullMessage() {
138         sink.start();
139         BusPublisher pub = mock(BusPublisher.class);
140         sink.publisher = pub;
141
142         sink.send(null);
143     }
144
145     @Test(expected = IllegalArgumentException.class)
146     public void testSend_EmptyMessage() {
147         sink.start();
148         BusPublisher pub = mock(BusPublisher.class);
149         sink.publisher = pub;
150
151         sink.send("");
152     }
153
154     @Test(expected = IllegalStateException.class)
155     public void testSend_NotStarted() {
156         BusPublisher pub = mock(BusPublisher.class);
157         sink.publisher = pub;
158
159         sink.send(MY_MESSAGE);
160     }
161
162     @Test
163     public void testSetPartitionKey_getPartitionKey() {
164         assertEquals(MY_PARTITION, sink.getPartitionKey());
165
166         sink.setPartitionKey("part-B");
167         assertEquals("part-B", sink.getPartitionKey());
168     }
169
170     @Test
171     public void testShutdown() {
172         BusPublisher pub = mock(BusPublisher.class);
173         sink.publisher = pub;
174
175         sink.shutdown();
176         verify(pub).close();
177     }
178
179     @Test
180     public void testAnyNullOrEmpty() {
181         assertFalse(sink.anyNullOrEmpty());
182         assertFalse(sink.anyNullOrEmpty("any-none-null", "any-none-null-B"));
183
184         assertTrue(sink.anyNullOrEmpty(null, "any-first-null"));
185         assertTrue(sink.anyNullOrEmpty("any-middle-null", null, "any-middle-null-B"));
186         assertTrue(sink.anyNullOrEmpty("any-last-null", null));
187         assertTrue(sink.anyNullOrEmpty("any-empty", ""));
188     }
189
190     @Test
191     public void testAllNullOrEmpty() {
192         assertTrue(sink.allNullOrEmpty());
193         assertTrue(sink.allNullOrEmpty(""));
194         assertTrue(sink.allNullOrEmpty(null, ""));
195
196         assertFalse(sink.allNullOrEmpty("all-ok-only-one"));
197         assertFalse(sink.allNullOrEmpty("all-ok-one", "all-ok-two"));
198         assertFalse(sink.allNullOrEmpty("all-ok-null", null));
199         assertFalse(sink.allNullOrEmpty("", "all-ok-empty"));
200         assertFalse(sink.allNullOrEmpty("", "all-one-ok", null));
201     }
202
203     @Test
204     public void testToString() {
205         assertTrue(sink.toString().startsWith("InlineBusTopicSink ["));
206     }
207
208     /**
209      * Implementation of InlineBusTopicSink that tracks the number of times that init() is
210      * invoked.
211      */
212     private static class InlineBusTopicSinkImpl extends InlineBusTopicSink {
213
214         private int initCount = 0;
215
216         public InlineBusTopicSinkImpl(BusTopicParams busTopicParams) {
217             super(busTopicParams);
218         }
219
220         @Override
221         public CommInfrastructure getTopicCommInfrastructure() {
222             return CommInfrastructure.NOOP;
223         }
224
225         @Override
226         public void init() {
227             ++initCount;
228         }
229
230     }
231 }