21050f9754e39e0821c221e32832e5938c96e3ce
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertFalse;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.fail;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.when;
31
32 import com.att.nsa.cambria.client.CambriaConsumer;
33 import java.io.IOException;
34 import java.util.Arrays;
35 import java.util.Collections;
36 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import org.apache.commons.collections4.IteratorUtils;
39 import org.junit.Before;
40 import org.junit.Test;
41 import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
42 import org.onap.dmaap.mr.client.response.MRConsumerResponse;
43 import org.onap.policy.common.endpoints.event.comm.bus.TopicTestBase;
44 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.CambriaConsumerWrapper;
45 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapAafConsumerWrapper;
46 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapConsumerWrapper;
47 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.DmaapDmeConsumerWrapper;
48 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusConsumer.FetchingBusConsumer;
49 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
50 import org.powermock.reflect.Whitebox;
51
52 public class BusConsumerTest extends TopicTestBase {
53
54     private static final int SHORT_TIMEOUT_MILLIS = 10;
55     private static final int LONG_TIMEOUT_MILLIS = 3000;
56
57     @Before
58     @Override
59     public void setUp() {
60         super.setUp();
61     }
62
63     @Test
64     public void testFetchingBusConsumer() throws InterruptedException {
65         // should not be negative
66         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(-1).build());
67         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
68
69         // should not be zero
70         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(0).build());
71         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
72
73         // should not be too large
74         cons = new FetchingBusConsumerImpl(
75                         makeBuilder().fetchTimeout(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH + 100).build());
76         assertThat(cons.getSleepTime()).isEqualTo(PolicyEndPointProperties.DEFAULT_TIMEOUT_MS_FETCH);
77
78         // should not be what was specified
79         cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(100).build());
80         assertThat(cons.getSleepTime()).isEqualTo(100);
81     }
82
83     @Test
84     public void testFetchingBusConsumerSleepAfterFetchFailure() throws InterruptedException {
85
86         var cons = new FetchingBusConsumerImpl(makeBuilder().fetchTimeout(SHORT_TIMEOUT_MILLIS).build()) {
87
88             private CountDownLatch started = new CountDownLatch(1);
89
90             @Override
91             protected void sleepAfterFetchFailure() {
92                 started.countDown();
93                 super.sleepAfterFetchFailure();
94             }
95         };
96
97         // full sleep
98         long tstart = System.currentTimeMillis();
99         cons.sleepAfterFetchFailure();
100         assertThat(System.currentTimeMillis() - tstart).isGreaterThanOrEqualTo(SHORT_TIMEOUT_MILLIS);
101
102         // close while sleeping - sleep should halt prematurely
103         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
104         cons.started = new CountDownLatch(1);
105         Thread thread = new Thread(cons::sleepAfterFetchFailure);
106         tstart = System.currentTimeMillis();
107         thread.start();
108         cons.started.await();
109         cons.close();
110         thread.join();
111         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
112
113         // interrupt while sleeping - sleep should halt prematurely
114         cons.fetchTimeout = LONG_TIMEOUT_MILLIS;
115         cons.started = new CountDownLatch(1);
116         thread = new Thread(cons::sleepAfterFetchFailure);
117         tstart = System.currentTimeMillis();
118         thread.start();
119         cons.started.await();
120         thread.interrupt();
121         thread.join();
122         assertThat(System.currentTimeMillis() - tstart).isLessThan(LONG_TIMEOUT_MILLIS);
123     }
124
125     @Test
126     public void testCambriaConsumerWrapper() {
127         // verify that different wrappers can be built
128         new CambriaConsumerWrapper(makeBuilder().build());
129         new CambriaConsumerWrapper(makeBuilder().useHttps(false).build());
130         new CambriaConsumerWrapper(makeBuilder().useHttps(true).build());
131         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(false).build());
132         new CambriaConsumerWrapper(makeBuilder().useHttps(true).allowSelfSignedCerts(true).build());
133         new CambriaConsumerWrapper(makeBuilder().apiKey(null).build());
134         new CambriaConsumerWrapper(makeBuilder().apiSecret(null).build());
135         new CambriaConsumerWrapper(makeBuilder().apiKey(null).apiSecret(null).build());
136         new CambriaConsumerWrapper(makeBuilder().userName(null).build());
137         new CambriaConsumerWrapper(makeBuilder().password(null).build());
138
139         assertThatCode(() -> new CambriaConsumerWrapper(makeBuilder().userName(null).password(null).build()))
140                         .doesNotThrowAnyException();
141     }
142
143     @Test
144     public void testCambriaConsumerWrapperFetch() throws Exception {
145         CambriaConsumer inner = mock(CambriaConsumer.class);
146         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
147         when(inner.fetch()).thenReturn(lst);
148
149         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
150         Whitebox.setInternalState(cons, "consumer", inner);
151
152         assertEquals(lst, IteratorUtils.toList(cons.fetch().iterator()));
153
154         // arrange to throw exception next time fetch is called
155         IOException ex = new IOException(EXPECTED);
156         when(inner.fetch()).thenThrow(ex);
157
158         cons.fetchTimeout = 10;
159
160         try {
161             cons.fetch();
162             fail("missing exception");
163
164         } catch (IOException e) {
165             assertEquals(ex, e);
166         }
167     }
168
169     @Test
170     public void testCambriaConsumerWrapperClose() {
171         CambriaConsumerWrapper cons = new CambriaConsumerWrapper(builder.build());
172         assertThatCode(() -> cons.close()).doesNotThrowAnyException();
173     }
174
175     @Test
176     public void testCambriaConsumerWrapperToString() {
177         assertNotNull(new CambriaConsumerWrapper(makeBuilder().build()).toString());
178     }
179
180     @Test
181     public void testDmaapConsumerWrapper() throws Exception {
182         // verify that different wrappers can be built
183         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build())).doesNotThrowAnyException();
184     }
185
186     @Test(expected = IllegalArgumentException.class)
187     public void testDmaapConsumerWrapper_InvalidTopic() throws Exception {
188         new DmaapAafConsumerWrapper(makeBuilder().topic(null).build());
189     }
190
191     @Test
192     public void testDmaapConsumerWrapperFetch() throws Exception {
193         DmaapAafConsumerWrapper dmaap = new DmaapAafConsumerWrapper(makeBuilder().build());
194         MRConsumerImpl cons = mock(MRConsumerImpl.class);
195
196         dmaap.fetchTimeout = 5;
197         dmaap.consumer = cons;
198
199         // null return
200         when(cons.fetchWithReturnConsumerResponse()).thenReturn(null);
201         assertFalse(dmaap.fetch().iterator().hasNext());
202
203         // with messages, 200
204         List<String> lst = Arrays.asList(MY_MESSAGE, MY_MESSAGE2);
205         MRConsumerResponse resp = new MRConsumerResponse();
206         resp.setResponseCode("200");
207         resp.setActualMessages(lst);
208         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
209
210         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
211
212         // null messages
213         resp.setActualMessages(null);
214         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
215
216         assertFalse(dmaap.fetch().iterator().hasNext());
217
218         // with messages, NOT 200
219         resp.setResponseCode("400");
220         resp.setActualMessages(lst);
221         when(cons.fetchWithReturnConsumerResponse()).thenReturn(resp);
222
223         assertEquals(lst, IteratorUtils.toList(dmaap.fetch().iterator()));
224     }
225
226     @Test
227     public void testDmaapConsumerWrapperClose() throws Exception {
228         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().build()).close()).doesNotThrowAnyException();
229     }
230
231     @Test
232     public void testDmaapConsumerWrapperToString() throws Exception {
233         assertNotNull(new DmaapConsumerWrapper(makeBuilder().build()) {}.toString());
234     }
235
236     @Test
237     public void testDmaapAafConsumerWrapper() throws Exception {
238         // verify that different wrappers can be built
239         new DmaapAafConsumerWrapper(makeBuilder().useHttps(true).build());
240         assertThatCode(() -> new DmaapAafConsumerWrapper(makeBuilder().useHttps(false).build()))
241                         .doesNotThrowAnyException();
242     }
243
244     @Test(expected = IllegalArgumentException.class)
245     public void testDmaapAafConsumerWrapper_InvalidServers() throws Exception {
246         /*
247          * Unfortunately, the MR code intercepts this and throws an exception before the
248          * wrapper gets a chance to check it, thus this test does not improve the coverage
249          * for the constructor.
250          */
251         new DmaapAafConsumerWrapper(makeBuilder().servers(Collections.emptyList()).build());
252     }
253
254     @Test
255     public void testDmaapAafConsumerWrapperToString() throws Exception {
256         assertNotNull(new DmaapAafConsumerWrapper(makeBuilder().build()).toString());
257     }
258
259     @Test
260     public void testDmaapDmeConsumerWrapper() throws Exception {
261         // verify that different wrappers can be built
262         new DmaapDmeConsumerWrapper(makeBuilder().build());
263         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(true).build());
264         new DmaapDmeConsumerWrapper(makeBuilder().useHttps(false).build());
265         new DmaapDmeConsumerWrapper(makeBuilder().additionalProps(null).build());
266
267         addProps.put(ROUTE_PROP, MY_ROUTE);
268         new DmaapDmeConsumerWrapper(makeBuilder().build());
269         assertThatCode(() -> new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build()))
270                         .doesNotThrowAnyException();
271     }
272
273     @Test(expected = IllegalArgumentException.class)
274     public void testDmaapDmeConsumerWrapper_InvalidEnvironment() throws Exception {
275         new DmaapDmeConsumerWrapper(makeBuilder().environment(null).build());
276     }
277
278     @Test(expected = IllegalArgumentException.class)
279     public void testDmaapDmeConsumerWrapper_InvalidAft() throws Exception {
280         new DmaapDmeConsumerWrapper(makeBuilder().aftEnvironment(null).build());
281     }
282
283     @Test(expected = IllegalArgumentException.class)
284     public void testDmaapDmeConsumerWrapper_InvalidLat() throws Exception {
285         new DmaapDmeConsumerWrapper(makeBuilder().latitude(null).build());
286     }
287
288     @Test(expected = IllegalArgumentException.class)
289     public void testDmaapDmeConsumerWrapper_InvalidLong() throws Exception {
290         new DmaapDmeConsumerWrapper(makeBuilder().longitude(null).build());
291     }
292
293     @Test(expected = IllegalArgumentException.class)
294     public void testDmaapDmeConsumerWrapper_InvalidPartner() throws Exception {
295         new DmaapDmeConsumerWrapper(makeBuilder().partner(null).build());
296     }
297
298     private static class FetchingBusConsumerImpl extends FetchingBusConsumer {
299
300         protected FetchingBusConsumerImpl(BusTopicParams busTopicParams) {
301             super(busTopicParams);
302         }
303
304         @Override
305         public Iterable<String> fetch() throws IOException {
306             return null;
307         }
308     }
309 }