8e3c1fa9b3e89b5d5d436c09dbb84773e6464d81
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.time.Instant;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import javax.persistence.EntityManagerFactory;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.mockito.Mock;
48 import org.mockito.MockitoAnnotations;
49 import org.onap.policy.controlloop.ControlLoopOperation;
50 import org.onap.policy.controlloop.VirtualControlLoopEvent;
51 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
52
53 public class OperationHistoryDataManagerImplTest {
54
55     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
56     private static final String MY_TARGET = "my-target";
57     private static final String REQ_ID = "my-request-id";
58     private static final int BATCH_SIZE = 5;
59     private static final int MAX_QUEUE_LENGTH = 23;
60
61     private static EntityManagerFactory emf;
62
63     @Mock
64     private Thread thread;
65
66     private OperationHistoryDataManagerParams params;
67     private Consumer<EntityManagerFactory> threadFunction;
68     private VirtualControlLoopEvent event;
69     private ControlLoopOperation operation;
70     private EntityManagerFactory emfSpy;
71
72     // decremented when the thread function completes
73     private CountDownLatch finished;
74
75     private OperationHistoryDataManagerImpl mgr;
76
77
78     /**
79      * Sets up for all tests.
80      */
81     @BeforeClass
82     public static void setUpBeforeClass() {
83         OperationHistoryDataManagerParams params = makeBuilder().build();
84
85         // capture the entity manager factory for re-use
86         new OperationHistoryDataManagerImpl(params) {
87             @Override
88             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
89                 emf = super.makeEntityManagerFactory(opsHistPu, props);
90                 return emf;
91             }
92         };
93     }
94
95     /**
96      * Restores the environment after all tests.
97      */
98     @AfterClass
99     public static void tearDownAfterClass() {
100         emf.close();
101     }
102
103     /**
104      * Sets up for an individual test.
105      */
106     @Before
107     public void setUp() {
108         MockitoAnnotations.initMocks(this);
109
110         event = new VirtualControlLoopEvent();
111         event.setRequestId(UUID.randomUUID());
112
113         operation = new ControlLoopOperation();
114         operation.setTarget(MY_TARGET);
115
116         threadFunction = null;
117         finished = new CountDownLatch(1);
118
119         // prevent the "real" emf from being closed
120         emfSpy = spy(emf);
121         doAnswer(ans -> null).when(emfSpy).close();
122
123         params = makeBuilder().build();
124
125         mgr = new PseudoThread();
126         mgr.start();
127     }
128
129     @After
130     public void tearDown() {
131         mgr.stop();
132     }
133
134     @Test
135     public void testConstructor() {
136         // use a thread and manager that haven't been started yet
137         thread = mock(Thread.class);
138         mgr = new PseudoThread();
139
140         // should not start the thread before start() is called
141         verify(thread, never()).start();
142
143         mgr.start();
144
145         // should have started the thread
146         verify(thread).start();
147
148         // invalid properties
149         params.setUrl(null);
150         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
151                         .hasMessageContaining("data-manager-properties");
152     }
153
154     @Test
155     public void testStart() {
156         // this should have no effect
157         mgr.start();
158
159         mgr.stop();
160
161         // this should also have no effect
162         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
163     }
164
165     @Test
166     public void testStore_testStop() throws InterruptedException {
167         // store
168         mgr.store(REQ_ID, event, operation);
169
170         runThread();
171
172         assertEquals(1, mgr.getRecordsAdded());
173     }
174
175     /**
176      * Tests stop() when the manager isn't running.
177      */
178     @Test
179     public void testStopNotRunning() {
180         // use a manager that hasn't been started yet
181         mgr = new PseudoThread();
182         mgr.stop();
183
184         verify(emfSpy).close();
185     }
186
187     /**
188      * Tests store() when it is already stopped.
189      */
190     @Test
191     public void testStoreAlreadyStopped() throws InterruptedException {
192         mgr.stop();
193
194         // store
195         mgr.store(REQ_ID, event, operation);
196
197         assertEquals(0, mgr.getRecordsAdded());
198     }
199
200     /**
201      * Tests store() when when the queue is full.
202      */
203     @Test
204     public void testStoreTooManyItems() throws InterruptedException {
205         final int nextra = 5;
206         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
207             mgr.store(REQ_ID, event, operation);
208         }
209
210         runThread();
211
212         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsAdded());
213     }
214
215     @Test
216     public void testRun() throws InterruptedException {
217
218         // trigger thread shutdown when it completes this batch
219         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
220             mgr.stop();
221             return emf.createEntityManager();
222         });
223
224
225         mgr = new RealThread();
226         mgr.start();
227
228         mgr.store(REQ_ID, event, operation);
229         mgr.store(REQ_ID, event, operation);
230         mgr.store(REQ_ID, event, operation);
231
232         waitForThread();
233
234         verify(emfSpy).close();
235
236         assertEquals(3, mgr.getRecordsAdded());
237     }
238
239     private void waitForThread() {
240         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
241     }
242
243     /**
244      * Tests run() when the entity manager throws an exception.
245      */
246     @Test
247     public void testRunException() throws InterruptedException {
248         AtomicInteger count = new AtomicInteger(0);
249
250         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
251             if (count.incrementAndGet() == 2) {
252                 // interrupt during one of the attempts
253                 thread.interrupt();
254             }
255
256             // throw an exception for each record
257             throw EXPECTED_EXCEPTION;
258         });
259
260
261         mgr = new RealThread();
262         mgr.start();
263
264         mgr.store(REQ_ID, event, operation);
265         mgr.store(REQ_ID, event, operation);
266         mgr.store(REQ_ID, event, operation);
267
268         waitForThread();
269
270         verify(emfSpy).close();
271     }
272
273     /**
274      * Tests storeRemainingRecords() when the entity manager throws an exception.
275      */
276     @Test
277     public void testStoreRemainingRecordsException() throws InterruptedException {
278         // arrange to throw an exception
279         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
280
281         mgr.store(REQ_ID, event, operation);
282
283         runThread();
284     }
285
286     @Test
287     public void testStoreRecord() throws InterruptedException {
288         // no start time
289         mgr.store(REQ_ID, event, operation);
290
291         // no start time
292         operation = new ControlLoopOperation(operation);
293         operation.setStart(Instant.now());
294         mgr.store(REQ_ID, event, operation);
295
296         // both start and end times
297         operation = new ControlLoopOperation(operation);
298         operation.setEnd(Instant.now());
299         mgr.store(REQ_ID, event, operation);
300
301         // only end time
302         operation = new ControlLoopOperation(operation);
303         operation.setStart(null);
304         mgr.store(REQ_ID, event, operation);
305
306         runThread();
307
308         // all of them should have been stored
309         assertEquals(4, mgr.getRecordsAdded());
310     }
311
312     private void runThread() throws InterruptedException {
313         if (threadFunction == null) {
314             return;
315         }
316
317         Thread thread2 = new Thread(() -> {
318             threadFunction.accept(emfSpy);
319             finished.countDown();
320         });
321
322         thread2.setDaemon(true);
323         thread2.start();
324
325         mgr.stop();
326
327         assertTrue(finished.await(5, TimeUnit.SECONDS));
328     }
329
330     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
331         // @formatter:off
332         return OperationHistoryDataManagerParams.builder()
333                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
334                         .userName("sa")
335                         .password("")
336                         .batchSize(BATCH_SIZE)
337                         .maxQueueLength(MAX_QUEUE_LENGTH);
338         // @formatter:on
339     }
340
341     /**
342      * Manager that uses the shared DB.
343      */
344     private class SharedDb extends OperationHistoryDataManagerImpl {
345         public SharedDb() {
346             super(params);
347         }
348
349         @Override
350         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
351             // re-use the same factory to avoid re-creating the DB for each test
352             return emfSpy;
353         }
354     }
355
356     /**
357      * Manager that uses the shared DB and a pseudo thread.
358      */
359     private class PseudoThread extends SharedDb {
360
361         @Override
362         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
363             threadFunction = command;
364             return thread;
365         }
366     }
367
368     /**
369      * Manager that uses the shared DB and catches the thread.
370      */
371     private class RealThread extends SharedDb {
372
373         @Override
374         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
375             thread = super.makeThread(emfactory, command);
376             return thread;
377         }
378     }
379 }