aba05887fd05d0e2cac63bdcd65a63f97b343ad7
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import org.apache.commons.collections4.IteratorUtils;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
42 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
43 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
44 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
46 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
49 import org.powermock.reflect.Whitebox;
50
51 public class BusConsumerTest extends TopicTestBase {
52
53     private static final int SHORT_TIMEOUT_MILLIS = 10;
54     private static final int LONG_TIMEOUT_MILLIS = 3000;
55
56     @Before
57     @Override
58     public void setUp() {
59         super.setUp();
60     }
61
62     @Test
63     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
64
65         var cons = new FetchingBusConsumer(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
66
67             private CountDownLatch started = new CountDownLatch(1);
68
69             @Override
70             protected void sleepAfterFetchFailure() {
71                 started.countDown();
72                 super.sleepAfterFetchFailure();
73             }
74
75             @Override
76             public Iterable<String> fetch() throws IOException {
77                 return null;
78             }
79         };
80
81         // full sleep
82         long tstart = System.currentTimeMillis();
83         cons.sleepAfterFetchFailure();
84         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
85
86         // close while sleeping - sleep should halt prematurely
87         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
88         cons.started = new CountDownLatch(1);
89         Thread thread = new Thread(cons::sleepAfterFetchFailure);
90         tstart = System.currentTimeMillis();
91         thread.start();
92         cons.started.await();
93         cons.close();
94         thread.join();
95         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
96
97         // interrupt while sleeping - sleep should halt prematurely
98         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
99         cons.started = new CountDownLatch(1);
100         thread = new Thread(cons::sleepAfterFetchFailure);
101         tstart = System.currentTimeMillis();
102         thread.start();
103         cons.started.await();
104         thread.interrupt();
105         thread.join();
106         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
107     }
108
109     @Test
110     public void testCambriaConsumerWrapper() {
111         // verify that different wrappers can be built
112         new CambriaConsumerWrapper(makeBuilder().build());
113         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
114         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
115         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
116         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
117         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
118         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
119         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
120         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
121         new CambriaConsumerWrapper(makeBuilder().password(null).build());
122
123         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
124                         .doesNotThrowAnyException();
125     }
126
127     @Test
128     public void testCambriaConsumerWrapperFetch() throws Exception {
129         CambriaConsumer inner = mock(CambriaConsumer.class);
130         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
131         when(inner.fetch()).thenReturn(lst);
132
133         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
134         Whitebox.setInternalState(cons, "consumer", inner);
135
136         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
137
138         // arrange to throw exception next time fetch is called
139         IOException ex = new IOException(EXPECTED);
140         when(inner.fetch()).thenThrow(ex);
141
142         cons.fetchTimeout = 10;
143
144         try {
145             cons.fetch();
146             fail("missing exception");
147
148         } catch (IOException e) {
149             assertEquals(ex, e);
150         }
151     }
152
153     @Test
154     public void testCambriaConsumerWrapperClose() {
155         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
156         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
157     }
158
159     @Test
160     public void testCambriaConsumerWrapperToString() {
161         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
162     }
163
164     @Test
165     public void testDmaapConsumerWrapper() throws Exception {
166         // verify that different wrappers can be built
167         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
168     }
169
170     @Test(expected = IllegalArgumentException.class)
171     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
172         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
173     }
174
175     @Test
176     public void testDmaapConsumerWrapperFetch() throws Exception {
177         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
178         MRConsumerImpl cons = mock(MRConsumerImpl.class);
179
180         dmaap.fetchTimeout = 5;
181         dmaap.consumer = cons;
182
183         // null return
184         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
185         assertFalse(dmaap.fetch().iterator().hasNext());
186
187         // with messages, 200
188         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
189         MRConsumerResponse resp = new MRConsumerResponse();
190         resp.setResponseCode("200");
191         resp.setActualMessages(lst);
192         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
193
194         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
195
196         // null messages
197         resp.setActualMessages(null);
198         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
199
200         assertFalse(dmaap.fetch().iterator().hasNext());
201
202         // with messages, NOT 200
203         resp.setResponseCode("400");
204         resp.setActualMessages(lst);
205         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
206
207         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
208     }
209
210     @Test
211     public void testDmaapConsumerWrapperClose() throws Exception {
212         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
213     }
214
215     @Test
216     public void testDmaapConsumerWrapperToString() throws Exception {
217         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
218     }
219
220     @Test
221     public void testDmaapAafConsumerWrapper() throws Exception {
222         // verify that different wrappers can be built
223         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
224         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
225                         .doesNotThrowAnyException();
226     }
227
228     @Test(expected = IllegalArgumentException.class)
229     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
230         /*
231          * Unfortunately, the MR code intercepts this and throws an exception before the
232          * wrapper gets a chance to check it, thus this test does not improve the coverage
233          * for the constructor.
234          */
235         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
236     }
237
238     @Test
239     public void testDmaapAafConsumerWrapperToString() throws Exception {
240         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
241     }
242
243     @Test
244     public void testDmaapDmeConsumerWrapper() throws Exception {
245         // verify that different wrappers can be built
246         new DmaapDmeConsumerWrapper(makeBuilder().build());
247         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
248         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
249         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
250
251         addProps.put(ROUTE_PROP, MY_ROUTE);
252         new DmaapDmeConsumerWrapper(makeBuilder().build());
253         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
254                         .doesNotThrowAnyException();
255     }
256
257     @Test(expected = IllegalArgumentException.class)
258     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
259         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
260     }
261
262     @Test(expected = IllegalArgumentException.class)
263     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
264         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
265     }
266
267     @Test(expected = IllegalArgumentException.class)
268     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
269         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
270     }
271
272     @Test(expected = IllegalArgumentException.class)
273     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
274         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
275     }
276
277     @Test(expected = IllegalArgumentException.class)
278     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
279         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
280     }
281 }