Replace Eclipselink with Hibernate
[policy/drools-applications.git] / controlloop / common / eventmanager / src / test / java / org / onap / policy / controlloop / ophistory / OperationHistoryDataManagerImplTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.ophistory;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import java.time.Instant;
36 import java.util.Properties;
37 import java.util.UUID;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.function.Consumer;
42 import javax.persistence.EntityManagerFactory;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.Mock;
50 import org.mockito.junit.MockitoJUnitRunner;
51 import org.onap.policy.controlloop.ControlLoopOperation;
52 import org.onap.policy.controlloop.VirtualControlLoopEvent;
53 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
54
55 @RunWith(MockitoJUnitRunner.class)
56 public class OperationHistoryDataManagerImplTest {
57
58     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
59     private static final String MY_LOOP_NAME = "my-loop-name";
60     private static final String MY_ACTOR = "my-actor";
61     private static final String MY_OPERATION = "my-operation";
62     private static final String MY_TARGET = "my-target";
63     private static final String MY_ENTITY = "my-entity";
64     private static final String REQ_ID = "my-request-id";
65     private static final int BATCH_SIZE = 5;
66     private static final int MAX_QUEUE_LENGTH = 23;
67
68     private static EntityManagerFactory emf;
69
70     @Mock
71     private Thread thread;
72
73     private OperationHistoryDataManagerParams params;
74     private Consumer<EntityManagerFactory> threadFunction;
75     private VirtualControlLoopEvent event;
76     private ControlLoopOperation operation;
77     private EntityManagerFactory emfSpy;
78
79     // decremented when the thread function completes
80     private CountDownLatch finished;
81
82     private OperationHistoryDataManagerImpl mgr;
83
84
85     /**
86      * Sets up for all tests.
87      */
88     @BeforeClass
89     public static void setUpBeforeClass() {
90         OperationHistoryDataManagerParams params = makeBuilder().build();
91
92         // capture the entity manager factory for re-use
93         new OperationHistoryDataManagerImpl(params) {
94             @Override
95             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
96                 emf = super.makeEntityManagerFactory(opsHistPu, props);
97                 return emf;
98             }
99         };
100     }
101
102     /**
103      * Restores the environment after all tests.
104      */
105     @AfterClass
106     public static void tearDownAfterClass() {
107         emf.close();
108     }
109
110     /**
111      * Sets up for an individual test.
112      */
113     @Before
114     public void setUp() {
115         event = new VirtualControlLoopEvent();
116         event.setClosedLoopControlName(MY_LOOP_NAME);
117         event.setRequestId(UUID.randomUUID());
118
119         operation = new ControlLoopOperation();
120         operation.setActor(MY_ACTOR);
121         operation.setOperation(MY_OPERATION);
122         operation.setTarget(MY_TARGET);
123         operation.setSubRequestId(UUID.randomUUID().toString());
124
125         threadFunction = null;
126         finished = new CountDownLatch(1);
127
128         // prevent the "real" emf from being closed
129         emfSpy = spy(emf);
130         doAnswer(ans -> null).when(emfSpy).close();
131
132         params = makeBuilder().build();
133
134         mgr = new PseudoThread();
135         mgr.start();
136     }
137
138     @After
139     public void tearDown() {
140         mgr.stop();
141     }
142
143     @Test
144     public void testConstructor() {
145         // use a thread and manager that haven't been started yet
146         thread = mock(Thread.class);
147         mgr = new PseudoThread();
148
149         // should not start the thread before start() is called
150         verify(thread, never()).start();
151
152         mgr.start();
153
154         // should have started the thread
155         verify(thread).start();
156
157         // invalid properties
158         params.setUrl(null);
159         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
160                         .hasMessageContaining("data-manager-properties");
161     }
162
163     @Test
164     public void testStart() {
165         // this should have no effect
166         mgr.start();
167
168         mgr.stop();
169
170         // this should also have no effect
171         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
172     }
173
174     @Test
175     public void testStore_testStop() throws InterruptedException {
176         // store
177         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
178
179         runThread();
180
181         assertEquals(1, mgr.getRecordsCommitted());
182     }
183
184     /**
185      * Tests stop() when the manager isn't running.
186      */
187     @Test
188     public void testStopNotRunning() {
189         // use a manager that hasn't been started yet
190         mgr = new PseudoThread();
191         mgr.stop();
192
193         verify(emfSpy).close();
194     }
195
196     /**
197      * Tests store() when it is already stopped.
198      */
199     @Test
200     public void testStoreAlreadyStopped() throws InterruptedException {
201         mgr.stop();
202
203         // store
204         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
205
206         assertEquals(0, mgr.getRecordsCommitted());
207     }
208
209     /**
210      * Tests store() when when the queue is full.
211      */
212     @Test
213     public void testStoreTooManyItems() throws InterruptedException {
214         final int nextra = 5;
215         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
216             mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
217         }
218
219         runThread();
220
221         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
222     }
223
224     @Test
225     public void testRun() throws InterruptedException {
226
227         // trigger thread shutdown when it completes this batch
228         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
229             mgr.stop();
230             return emf.createEntityManager();
231         });
232
233
234         mgr = new RealThread();
235         mgr.start();
236
237         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
238         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
239         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
240
241         waitForThread();
242
243         verify(emfSpy).close();
244
245         assertEquals(3, mgr.getRecordsCommitted());
246     }
247
248     private void waitForThread() {
249         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
250     }
251
252     /**
253      * Tests run() when the entity manager throws an exception.
254      */
255     @Test
256     public void testRunException() throws InterruptedException {
257         AtomicInteger count = new AtomicInteger(0);
258
259         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
260             if (count.incrementAndGet() == 2) {
261                 // interrupt during one of the attempts
262                 thread.interrupt();
263             }
264
265             // throw an exception for each record
266             throw EXPECTED_EXCEPTION;
267         });
268
269
270         mgr = new RealThread();
271         mgr.start();
272
273         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
274         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
275         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
276
277         waitForThread();
278
279         verify(emfSpy).close();
280     }
281
282     /**
283      * Tests storeRemainingRecords() when the entity manager throws an exception.
284      */
285     @Test
286     public void testStoreRemainingRecordsException() throws InterruptedException {
287         // arrange to throw an exception
288         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
289
290         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
291
292         runThread();
293     }
294
295     @Test
296     public void testStoreRecord() throws InterruptedException {
297         /*
298          * Note: we change sub-request ID each time to guarantee that the records are
299          * unique.
300          */
301
302         // no start time
303         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
304
305         // no end time
306         operation = new ControlLoopOperation(operation);
307         operation.setSubRequestId(UUID.randomUUID().toString());
308         operation.setStart(Instant.now());
309         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
310
311         // both start and end times
312         operation = new ControlLoopOperation(operation);
313         operation.setSubRequestId(UUID.randomUUID().toString());
314         operation.setEnd(Instant.now());
315         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
316
317         // only end time
318         operation = new ControlLoopOperation(operation);
319         operation.setSubRequestId(UUID.randomUUID().toString());
320         operation.setStart(null);
321         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
322
323         runThread();
324
325         // all of them should have been stored
326         assertEquals(4, mgr.getRecordsCommitted());
327
328         // each was unique
329         assertEquals(4, mgr.getRecordsInserted());
330         assertEquals(0, mgr.getRecordsUpdated());
331     }
332
333     /**
334      * Tests storeRecord() when records are updated.
335      */
336     @Test
337     public void testStoreRecordUpdate() throws InterruptedException {
338         /*
339          * Note: we do NOT change sub-request ID, so that records all refer to the same DB
340          * record.
341          */
342
343         // no start time
344         operation.setStart(null);
345         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
346
347         // no end time
348         operation = new ControlLoopOperation(operation);
349         operation.setStart(Instant.now());
350         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
351
352         // both start and end times
353         operation = new ControlLoopOperation(operation);
354         operation.setEnd(Instant.now());
355         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
356
357         // only end time
358         operation = new ControlLoopOperation(operation);
359         operation.setStart(null);
360         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
361
362         runThread();
363
364         // all of them should have been stored
365         assertEquals(4, mgr.getRecordsCommitted());
366
367         // only one new record
368         assertEquals(1, mgr.getRecordsInserted());
369
370         // remainder were updates
371         assertEquals(3, mgr.getRecordsUpdated());
372     }
373
374     private void runThread() throws InterruptedException {
375         if (threadFunction == null) {
376             return;
377         }
378
379         Thread thread2 = new Thread(() -> {
380             threadFunction.accept(emfSpy);
381             finished.countDown();
382         });
383
384         thread2.setDaemon(true);
385         thread2.start();
386
387         mgr.stop();
388
389         assertTrue(finished.await(5, TimeUnit.SECONDS));
390     }
391
392     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
393         // @formatter:off
394         return OperationHistoryDataManagerParams.builder()
395                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
396                         .dbType("H2")
397                         .driver("org.h2.Driver")
398                         .userName("sa")
399                         .password("")
400                         .batchSize(BATCH_SIZE)
401                         .maxQueueLength(MAX_QUEUE_LENGTH);
402         // @formatter:on
403     }
404
405     /**
406      * Manager that uses the shared DB.
407      */
408     private class SharedDb extends OperationHistoryDataManagerImpl {
409         public SharedDb() {
410             super(params);
411         }
412
413         @Override
414         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
415             // re-use the same factory to avoid re-creating the DB for each test
416             return emfSpy;
417         }
418     }
419
420     /**
421      * Manager that uses the shared DB and a pseudo thread.
422      */
423     private class PseudoThread extends SharedDb {
424
425         @Override
426         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
427             threadFunction = command;
428             return thread;
429         }
430     }
431
432     /**
433      * Manager that uses the shared DB and catches the thread.
434      */
435     private class RealThread extends SharedDb {
436
437         @Override
438         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
439             thread = super.makeThread(emfactory, command);
440             return thread;
441         }
442     }
443 }