9d32a85e123d07904f82abae4143e17726c53333
[policy/drools-applications.git] / controlloop / common / eventmanager / src / test / java / org / onap / policy / controlloop / ophistory / OperationHistoryDataManagerImplTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.ophistory;
22
23 import static org.assertj.core.api.Assertions.assertThatCode;
24 import static org.awaitility.Awaitility.await;
25 import static org.junit.Assert.assertEquals;
26 import static org.junit.Assert.assertTrue;
27 import static org.mockito.Mockito.doAnswer;
28 import static org.mockito.Mockito.mock;
29 import static org.mockito.Mockito.never;
30 import static org.mockito.Mockito.spy;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.when;
33
34 import java.time.Instant;
35 import java.util.Properties;
36 import java.util.UUID;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import java.util.function.Consumer;
41 import javax.persistence.EntityManagerFactory;
42 import org.junit.After;
43 import org.junit.AfterClass;
44 import org.junit.Before;
45 import org.junit.BeforeClass;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.Mock;
49 import org.mockito.junit.MockitoJUnitRunner;
50 import org.onap.policy.controlloop.ControlLoopOperation;
51 import org.onap.policy.controlloop.VirtualControlLoopEvent;
52 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
53
54 @RunWith(MockitoJUnitRunner.class)
55 public class OperationHistoryDataManagerImplTest {
56
57     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
58     private static final String MY_LOOP_NAME = "my-loop-name";
59     private static final String MY_ACTOR = "my-actor";
60     private static final String MY_OPERATION = "my-operation";
61     private static final String MY_TARGET = "my-target";
62     private static final String MY_ENTITY = "my-entity";
63     private static final String REQ_ID = "my-request-id";
64     private static final int BATCH_SIZE = 5;
65     private static final int MAX_QUEUE_LENGTH = 23;
66
67     private static EntityManagerFactory emf;
68
69     @Mock
70     private Thread thread;
71
72     private OperationHistoryDataManagerParams params;
73     private Consumer<EntityManagerFactory> threadFunction;
74     private VirtualControlLoopEvent event;
75     private ControlLoopOperation operation;
76     private EntityManagerFactory emfSpy;
77
78     // decremented when the thread function completes
79     private CountDownLatch finished;
80
81     private OperationHistoryDataManagerImpl mgr;
82
83
84     /**
85      * Sets up for all tests.
86      */
87     @BeforeClass
88     public static void setUpBeforeClass() {
89         OperationHistoryDataManagerParams params = makeBuilder().build();
90
91         // capture the entity manager factory for re-use
92         new OperationHistoryDataManagerImpl(params) {
93             @Override
94             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
95                 emf = super.makeEntityManagerFactory(opsHistPu, props);
96                 return emf;
97             }
98         };
99     }
100
101     /**
102      * Restores the environment after all tests.
103      */
104     @AfterClass
105     public static void tearDownAfterClass() {
106         emf.close();
107     }
108
109     /**
110      * Sets up for an individual test.
111      */
112     @Before
113     public void setUp() {
114         event = new VirtualControlLoopEvent();
115         event.setClosedLoopControlName(MY_LOOP_NAME);
116         event.setRequestId(UUID.randomUUID());
117
118         operation = new ControlLoopOperation();
119         operation.setActor(MY_ACTOR);
120         operation.setOperation(MY_OPERATION);
121         operation.setTarget(MY_TARGET);
122         operation.setSubRequestId(UUID.randomUUID().toString());
123
124         threadFunction = null;
125         finished = new CountDownLatch(1);
126
127         // prevent the "real" emf from being closed
128         emfSpy = spy(emf);
129         doAnswer(ans -> null).when(emfSpy).close();
130
131         params = makeBuilder().build();
132
133         mgr = new PseudoThread();
134         mgr.start();
135     }
136
137     @After
138     public void tearDown() {
139         mgr.stop();
140     }
141
142     @Test
143     public void testConstructor() {
144         // use a thread and manager that haven't been started yet
145         thread = mock(Thread.class);
146         mgr = new PseudoThread();
147
148         // should not start the thread before start() is called
149         verify(thread, never()).start();
150
151         mgr.start();
152
153         // should have started the thread
154         verify(thread).start();
155
156         // invalid properties
157         params.setUrl(null);
158         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
159                         .hasMessageContaining("data-manager-properties");
160     }
161
162     @Test
163     public void testStart() {
164         // this should have no effect
165         mgr.start();
166
167         mgr.stop();
168
169         // this should also have no effect
170         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
171     }
172
173     @Test
174     public void testStore_testStop() throws InterruptedException {
175         // store
176         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
177
178         runThread();
179
180         assertEquals(1, mgr.getRecordsCommitted());
181     }
182
183     /**
184      * Tests stop() when the manager isn't running.
185      */
186     @Test
187     public void testStopNotRunning() {
188         // use a manager that hasn't been started yet
189         mgr = new PseudoThread();
190         mgr.stop();
191
192         verify(emfSpy).close();
193     }
194
195     /**
196      * Tests store() when it is already stopped.
197      */
198     @Test
199     public void testStoreAlreadyStopped() throws InterruptedException {
200         mgr.stop();
201
202         // store
203         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
204
205         assertEquals(0, mgr.getRecordsCommitted());
206     }
207
208     /**
209      * Tests store() when when the queue is full.
210      */
211     @Test
212     public void testStoreTooManyItems() throws InterruptedException {
213         final int nextra = 5;
214         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
215             mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
216         }
217
218         runThread();
219
220         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
221     }
222
223     @Test
224     public void testRun() throws InterruptedException {
225
226         // trigger thread shutdown when it completes this batch
227         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
228             mgr.stop();
229             return emf.createEntityManager();
230         });
231
232
233         mgr = new RealThread();
234         mgr.start();
235
236         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
237         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
238         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
239
240         waitForThread();
241
242         verify(emfSpy).close();
243
244         assertEquals(3, mgr.getRecordsCommitted());
245     }
246
247     private void waitForThread() {
248         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
249     }
250
251     /**
252      * Tests run() when the entity manager throws an exception.
253      */
254     @Test
255     public void testRunException() throws InterruptedException {
256         AtomicInteger count = new AtomicInteger(0);
257
258         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
259             if (count.incrementAndGet() == 2) {
260                 // interrupt during one of the attempts
261                 thread.interrupt();
262             }
263
264             // throw an exception for each record
265             throw EXPECTED_EXCEPTION;
266         });
267
268
269         mgr = new RealThread();
270         mgr.start();
271
272         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
273         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
274         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
275
276         waitForThread();
277
278         verify(emfSpy).close();
279     }
280
281     /**
282      * Tests storeRemainingRecords() when the entity manager throws an exception.
283      */
284     @Test
285     public void testStoreRemainingRecordsException() throws InterruptedException {
286         // arrange to throw an exception
287         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
288
289         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
290
291         runThread();
292     }
293
294     @Test
295     public void testStoreRecord() throws InterruptedException {
296         /*
297          * Note: we change sub-request ID each time to guarantee that the records are
298          * unique.
299          */
300
301         // no start time
302         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
303
304         // no end time
305         operation = new ControlLoopOperation(operation);
306         operation.setSubRequestId(UUID.randomUUID().toString());
307         operation.setStart(Instant.now());
308         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
309
310         // both start and end times
311         operation = new ControlLoopOperation(operation);
312         operation.setSubRequestId(UUID.randomUUID().toString());
313         operation.setEnd(Instant.now());
314         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
315
316         // only end time
317         operation = new ControlLoopOperation(operation);
318         operation.setSubRequestId(UUID.randomUUID().toString());
319         operation.setStart(null);
320         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
321
322         runThread();
323
324         // all of them should have been stored
325         assertEquals(4, mgr.getRecordsCommitted());
326
327         // each was unique
328         assertEquals(4, mgr.getRecordsInserted());
329         assertEquals(0, mgr.getRecordsUpdated());
330     }
331
332     /**
333      * Tests storeRecord() when records are updated.
334      */
335     @Test
336     public void testStoreRecordUpdate() throws InterruptedException {
337         /*
338          * Note: we do NOT change sub-request ID, so that records all refer to the same DB
339          * record.
340          */
341
342         // no start time
343         operation.setStart(null);
344         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
345
346         // no end time
347         operation = new ControlLoopOperation(operation);
348         operation.setStart(Instant.now());
349         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
350
351         // both start and end times
352         operation = new ControlLoopOperation(operation);
353         operation.setEnd(Instant.now());
354         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
355
356         // only end time
357         operation = new ControlLoopOperation(operation);
358         operation.setStart(null);
359         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
360
361         runThread();
362
363         // all of them should have been stored
364         assertEquals(4, mgr.getRecordsCommitted());
365
366         // only one new record
367         assertEquals(1, mgr.getRecordsInserted());
368
369         // remainder were updates
370         assertEquals(3, mgr.getRecordsUpdated());
371     }
372
373     private void runThread() throws InterruptedException {
374         if (threadFunction == null) {
375             return;
376         }
377
378         Thread thread2 = new Thread(() -> {
379             threadFunction.accept(emfSpy);
380             finished.countDown();
381         });
382
383         thread2.setDaemon(true);
384         thread2.start();
385
386         mgr.stop();
387
388         assertTrue(finished.await(5, TimeUnit.SECONDS));
389     }
390
391     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
392         // @formatter:off
393         return OperationHistoryDataManagerParams.builder()
394                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
395                         .userName("sa")
396                         .password("")
397                         .batchSize(BATCH_SIZE)
398                         .maxQueueLength(MAX_QUEUE_LENGTH);
399         // @formatter:on
400     }
401
402     /**
403      * Manager that uses the shared DB.
404      */
405     private class SharedDb extends OperationHistoryDataManagerImpl {
406         public SharedDb() {
407             super(params);
408         }
409
410         @Override
411         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
412             // re-use the same factory to avoid re-creating the DB for each test
413             return emfSpy;
414         }
415     }
416
417     /**
418      * Manager that uses the shared DB and a pseudo thread.
419      */
420     private class PseudoThread extends SharedDb {
421
422         @Override
423         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
424             threadFunction = command;
425             return thread;
426         }
427     }
428
429     /**
430      * Manager that uses the shared DB and catches the thread.
431      */
432     private class RealThread extends SharedDb {
433
434         @Override
435         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
436             thread = super.makeThread(emfactory, command);
437             return thread;
438         }
439     }
440 }