Upgrade Java 17 in policy-drools-apps
[policy/drools-applications.git] / controlloop / common / eventmanager / src / test / java / org / onap / policy / controlloop / ophistory / OperationHistoryDataManagerImplTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.ophistory;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.jupiter.api.Assertions.assertEquals;
27 import static org.junit.jupiter.api.Assertions.assertTrue;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import jakarta.persistence.EntityManagerFactory;
36 import java.time.Instant;
37 import java.util.Properties;
38 import java.util.UUID;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Consumer;
43 import org.junit.jupiter.api.AfterAll;
44 import org.junit.jupiter.api.AfterEach;
45 import org.junit.jupiter.api.BeforeAll;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.onap.policy.controlloop.ControlLoopOperation;
49 import org.onap.policy.controlloop.VirtualControlLoopEvent;
50 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
51
52 class OperationHistoryDataManagerImplTest {
53
54     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
55     private static final String MY_LOOP_NAME = "my-loop-name";
56     private static final String MY_ACTOR = "my-actor";
57     private static final String MY_OPERATION = "my-operation";
58     private static final String MY_TARGET = "my-target";
59     private static final String MY_ENTITY = "my-entity";
60     private static final String REQ_ID = "my-request-id";
61     private static final int BATCH_SIZE = 5;
62     private static final int MAX_QUEUE_LENGTH = 23;
63
64     private static EntityManagerFactory emf;
65
66     private Thread thread = mock(Thread.class);
67
68     private OperationHistoryDataManagerParams params;
69     private Consumer<EntityManagerFactory> threadFunction;
70     private VirtualControlLoopEvent event;
71     private ControlLoopOperation operation;
72     private EntityManagerFactory emfSpy;
73
74     // decremented when the thread function completes
75     private CountDownLatch finished;
76
77     private OperationHistoryDataManagerImpl mgr;
78
79
80     /**
81      * Sets up for all tests.
82      */
83     @BeforeAll
84     public static void setUpBeforeClass() {
85         var params = makeBuilder().build();
86
87         // capture the entity manager factory for re-use
88         new OperationHistoryDataManagerImpl(params) {
89             @Override
90             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
91                 emf = super.makeEntityManagerFactory(opsHistPu, props);
92                 return emf;
93             }
94         };
95     }
96
97     /**
98      * Restores the environment after all tests.
99      */
100     @AfterAll
101     public static void tearDownAfterClass() {
102         emf.close();
103     }
104
105     /**
106      * Sets up for an individual test.
107      */
108     @BeforeEach
109     public void setUp() {
110         event = new VirtualControlLoopEvent();
111         event.setClosedLoopControlName(MY_LOOP_NAME);
112         event.setRequestId(UUID.randomUUID());
113
114         operation = new ControlLoopOperation();
115         operation.setActor(MY_ACTOR);
116         operation.setOperation(MY_OPERATION);
117         operation.setTarget(MY_TARGET);
118         operation.setSubRequestId(UUID.randomUUID().toString());
119
120         threadFunction = null;
121         finished = new CountDownLatch(1);
122
123         // prevent the "real" emf from being closed
124         emfSpy = spy(emf);
125         doAnswer(ans -> null).when(emfSpy).close();
126
127         params = makeBuilder().build();
128
129         mgr = new PseudoThread();
130         mgr.start();
131     }
132
133     @AfterEach
134     public void tearDown() {
135         mgr.stop();
136     }
137
138     @Test
139     void testConstructor() {
140         // use a thread and manager that haven't been started yet
141         thread = mock(Thread.class);
142         mgr = new PseudoThread();
143
144         // should not start the thread before start() is called
145         verify(thread, never()).start();
146
147         mgr.start();
148
149         // should have started the thread
150         verify(thread).start();
151
152         // invalid properties
153         params.setUrl(null);
154         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
155                         .hasMessageContaining("data-manager-properties");
156     }
157
158     @Test
159     void testStart() {
160         // this should have no effect
161         mgr.start();
162
163         mgr.stop();
164
165         // this should also have no effect
166         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
167     }
168
169     @Test
170     void testStore_testStop() throws InterruptedException {
171         // store
172         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
173
174         runThread();
175
176         assertEquals(1, mgr.getRecordsCommitted());
177     }
178
179     /**
180      * Tests stop() when the manager isn't running.
181      */
182     @Test
183     void testStopNotRunning() {
184         // use a manager that hasn't been started yet
185         mgr = new PseudoThread();
186         mgr.stop();
187
188         verify(emfSpy).close();
189     }
190
191     /**
192      * Tests store() when it is already stopped.
193      */
194     @Test
195     void testStoreAlreadyStopped() throws InterruptedException {
196         mgr.stop();
197
198         // store
199         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
200
201         assertEquals(0, mgr.getRecordsCommitted());
202     }
203
204     /**
205      * Tests store() when when the queue is full.
206      */
207     @Test
208     void testStoreTooManyItems() throws InterruptedException {
209         final int nextra = 5;
210         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
211             mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
212         }
213
214         runThread();
215
216         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
217     }
218
219     @Test
220     void testRun() throws InterruptedException {
221
222         // trigger thread shutdown when it completes this batch
223         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
224             mgr.stop();
225             return emf.createEntityManager();
226         });
227
228
229         mgr = new RealThread();
230         mgr.start();
231
232         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
233         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
234         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
235
236         waitForThread();
237
238         verify(emfSpy).close();
239
240         assertEquals(3, mgr.getRecordsCommitted());
241     }
242
243     private void waitForThread() {
244         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
245     }
246
247     /**
248      * Tests run() when the entity manager throws an exception.
249      */
250     @Test
251     void testRunException() throws InterruptedException {
252         var count = new AtomicInteger(0);
253
254         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
255             if (count.incrementAndGet() == 2) {
256                 // interrupt during one of the attempts
257                 thread.interrupt();
258             }
259
260             // throw an exception for each record
261             throw EXPECTED_EXCEPTION;
262         });
263
264
265         mgr = new RealThread();
266         mgr.start();
267
268         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
269         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
270         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
271
272         waitForThread();
273
274         verify(emfSpy).close();
275     }
276
277     /**
278      * Tests storeRemainingRecords() when the entity manager throws an exception.
279      */
280     @Test
281     void testStoreRemainingRecordsException() throws InterruptedException {
282         // arrange to throw an exception
283         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
284
285         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
286
287         runThread();
288     }
289
290     @Test
291     void testStoreRecord() throws InterruptedException {
292         /*
293          * Note: we change sub-request ID each time to guarantee that the records are
294          * unique.
295          */
296
297         // no start time
298         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
299
300         // no end time
301         operation = new ControlLoopOperation(operation);
302         operation.setSubRequestId(UUID.randomUUID().toString());
303         operation.setStart(Instant.now());
304         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
305
306         // both start and end times
307         operation = new ControlLoopOperation(operation);
308         operation.setSubRequestId(UUID.randomUUID().toString());
309         operation.setEnd(Instant.now());
310         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
311
312         // only end time
313         operation = new ControlLoopOperation(operation);
314         operation.setSubRequestId(UUID.randomUUID().toString());
315         operation.setStart(null);
316         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
317
318         runThread();
319
320         // all of them should have been stored
321         assertEquals(4, mgr.getRecordsCommitted());
322
323         // each was unique
324         assertEquals(4, mgr.getRecordsInserted());
325         assertEquals(0, mgr.getRecordsUpdated());
326     }
327
328     /**
329      * Tests storeRecord() when records are updated.
330      */
331     @Test
332     void testStoreRecordUpdate() throws InterruptedException {
333         /*
334          * Note: we do NOT change sub-request ID, so that records all refer to the same DB
335          * record.
336          */
337
338         // no start time
339         operation.setStart(null);
340         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
341
342         // no end time
343         operation = new ControlLoopOperation(operation);
344         operation.setStart(Instant.now());
345         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
346
347         // both start and end times
348         operation = new ControlLoopOperation(operation);
349         operation.setEnd(Instant.now());
350         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
351
352         // only end time
353         operation = new ControlLoopOperation(operation);
354         operation.setStart(null);
355         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
356
357         runThread();
358
359         // all of them should have been stored
360         assertEquals(4, mgr.getRecordsCommitted());
361
362         // only one new record
363         assertEquals(1, mgr.getRecordsInserted());
364
365         // remainder were updates
366         assertEquals(3, mgr.getRecordsUpdated());
367     }
368
369     private void runThread() throws InterruptedException {
370         if (threadFunction == null) {
371             return;
372         }
373
374         var thread2 = new Thread(() -> {
375             threadFunction.accept(emfSpy);
376             finished.countDown();
377         });
378
379         thread2.setDaemon(true);
380         thread2.start();
381
382         mgr.stop();
383
384         assertTrue(finished.await(5, TimeUnit.SECONDS));
385     }
386
387     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
388         // @formatter:off
389         return OperationHistoryDataManagerParams.builder()
390                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
391                         .dbType("H2")
392                         .driver("org.h2.Driver")
393                         .userName("sa")
394                         .password("")
395                         .batchSize(BATCH_SIZE)
396                         .maxQueueLength(MAX_QUEUE_LENGTH);
397         // @formatter:on
398     }
399
400     /**
401      * Manager that uses the shared DB.
402      */
403     private class SharedDb extends OperationHistoryDataManagerImpl {
404         public SharedDb() {
405             super(params);
406         }
407
408         @Override
409         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
410             // re-use the same factory to avoid re-creating the DB for each test
411             return emfSpy;
412         }
413     }
414
415     /**
416      * Manager that uses the shared DB and a pseudo thread.
417      */
418     private class PseudoThread extends SharedDb {
419
420         @Override
421         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
422             threadFunction = command;
423             return thread;
424         }
425     }
426
427     /**
428      * Manager that uses the shared DB and catches the thread.
429      */
430     private class RealThread extends SharedDb {
431
432         @Override
433         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
434             thread = super.makeThread(emfactory, command);
435             return thread;
436         }
437     }
438 }