a1fd85a2375bdc98524c18f2a748ef0debc0dce8
[policy/drools-applications.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.controlloop.ophistory;
23
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.awaitility.Awaitility.await;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertTrue;
28 import static org.mockito.Mockito.doAnswer;
29 import static org.mockito.Mockito.mock;
30 import static org.mockito.Mockito.never;
31 import static org.mockito.Mockito.spy;
32 import static org.mockito.Mockito.verify;
33 import static org.mockito.Mockito.when;
34
35 import jakarta.persistence.EntityManagerFactory;
36 import java.time.Instant;
37 import java.util.Properties;
38 import java.util.UUID;
39 import java.util.concurrent.CountDownLatch;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Consumer;
43 import org.junit.After;
44 import org.junit.AfterClass;
45 import org.junit.Before;
46 import org.junit.BeforeClass;
47 import org.junit.Test;
48 import org.mockito.Mock;
49 import org.onap.policy.controlloop.ControlLoopOperation;
50 import org.onap.policy.controlloop.VirtualControlLoopEvent;
51 import org.onap.policy.controlloop.ophistory.OperationHistoryDataManagerParams.OperationHistoryDataManagerParamsBuilder;
52
53 public class OperationHistoryDataManagerImplTest {
54
55     private static final IllegalStateException EXPECTED_EXCEPTION = new IllegalStateException("expected exception");
56     private static final String MY_LOOP_NAME = "my-loop-name";
57     private static final String MY_ACTOR = "my-actor";
58     private static final String MY_OPERATION = "my-operation";
59     private static final String MY_TARGET = "my-target";
60     private static final String MY_ENTITY = "my-entity";
61     private static final String REQ_ID = "my-request-id";
62     private static final int BATCH_SIZE = 5;
63     private static final int MAX_QUEUE_LENGTH = 23;
64
65     private static EntityManagerFactory emf;
66
67     private Thread thread = mock(Thread.class);
68
69     private OperationHistoryDataManagerParams params;
70     private Consumer<EntityManagerFactory> threadFunction;
71     private VirtualControlLoopEvent event;
72     private ControlLoopOperation operation;
73     private EntityManagerFactory emfSpy;
74
75     // decremented when the thread function completes
76     private CountDownLatch finished;
77
78     private OperationHistoryDataManagerImpl mgr;
79
80
81     /**
82      * Sets up for all tests.
83      */
84     @BeforeClass
85     public static void setUpBeforeClass() {
86         OperationHistoryDataManagerParams params = makeBuilder().build();
87
88         // capture the entity manager factory for re-use
89         new OperationHistoryDataManagerImpl(params) {
90             @Override
91             protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
92                 emf = super.makeEntityManagerFactory(opsHistPu, props);
93                 return emf;
94             }
95         };
96     }
97
98     /**
99      * Restores the environment after all tests.
100      */
101     @AfterClass
102     public static void tearDownAfterClass() {
103         emf.close();
104     }
105
106     /**
107      * Sets up for an individual test.
108      */
109     @Before
110     public void setUp() {
111         event = new VirtualControlLoopEvent();
112         event.setClosedLoopControlName(MY_LOOP_NAME);
113         event.setRequestId(UUID.randomUUID());
114
115         operation = new ControlLoopOperation();
116         operation.setActor(MY_ACTOR);
117         operation.setOperation(MY_OPERATION);
118         operation.setTarget(MY_TARGET);
119         operation.setSubRequestId(UUID.randomUUID().toString());
120
121         threadFunction = null;
122         finished = new CountDownLatch(1);
123
124         // prevent the "real" emf from being closed
125         emfSpy = spy(emf);
126         doAnswer(ans -> null).when(emfSpy).close();
127
128         params = makeBuilder().build();
129
130         mgr = new PseudoThread();
131         mgr.start();
132     }
133
134     @After
135     public void tearDown() {
136         mgr.stop();
137     }
138
139     @Test
140     public void testConstructor() {
141         // use a thread and manager that haven't been started yet
142         thread = mock(Thread.class);
143         mgr = new PseudoThread();
144
145         // should not start the thread before start() is called
146         verify(thread, never()).start();
147
148         mgr.start();
149
150         // should have started the thread
151         verify(thread).start();
152
153         // invalid properties
154         params.setUrl(null);
155         assertThatCode(() -> new PseudoThread()).isInstanceOf(IllegalArgumentException.class)
156                         .hasMessageContaining("data-manager-properties");
157     }
158
159     @Test
160     public void testStart() {
161         // this should have no effect
162         mgr.start();
163
164         mgr.stop();
165
166         // this should also have no effect
167         assertThatCode(() -> mgr.start()).doesNotThrowAnyException();
168     }
169
170     @Test
171     public void testStore_testStop() throws InterruptedException {
172         // store
173         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
174
175         runThread();
176
177         assertEquals(1, mgr.getRecordsCommitted());
178     }
179
180     /**
181      * Tests stop() when the manager isn't running.
182      */
183     @Test
184     public void testStopNotRunning() {
185         // use a manager that hasn't been started yet
186         mgr = new PseudoThread();
187         mgr.stop();
188
189         verify(emfSpy).close();
190     }
191
192     /**
193      * Tests store() when it is already stopped.
194      */
195     @Test
196     public void testStoreAlreadyStopped() throws InterruptedException {
197         mgr.stop();
198
199         // store
200         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
201
202         assertEquals(0, mgr.getRecordsCommitted());
203     }
204
205     /**
206      * Tests store() when when the queue is full.
207      */
208     @Test
209     public void testStoreTooManyItems() throws InterruptedException {
210         final int nextra = 5;
211         for (int nitems = 0; nitems < MAX_QUEUE_LENGTH + nextra; ++nitems) {
212             mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
213         }
214
215         runThread();
216
217         assertEquals(MAX_QUEUE_LENGTH, mgr.getRecordsCommitted());
218     }
219
220     @Test
221     public void testRun() throws InterruptedException {
222
223         // trigger thread shutdown when it completes this batch
224         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
225             mgr.stop();
226             return emf.createEntityManager();
227         });
228
229
230         mgr = new RealThread();
231         mgr.start();
232
233         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
234         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
235         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
236
237         waitForThread();
238
239         verify(emfSpy).close();
240
241         assertEquals(3, mgr.getRecordsCommitted());
242     }
243
244     private void waitForThread() {
245         await().atMost(5, TimeUnit.SECONDS).until(() -> !thread.isAlive());
246     }
247
248     /**
249      * Tests run() when the entity manager throws an exception.
250      */
251     @Test
252     public void testRunException() throws InterruptedException {
253         AtomicInteger count = new AtomicInteger(0);
254
255         when(emfSpy.createEntityManager()).thenAnswer(ans -> {
256             if (count.incrementAndGet() == 2) {
257                 // interrupt during one of the attempts
258                 thread.interrupt();
259             }
260
261             // throw an exception for each record
262             throw EXPECTED_EXCEPTION;
263         });
264
265
266         mgr = new RealThread();
267         mgr.start();
268
269         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
270         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
271         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
272
273         waitForThread();
274
275         verify(emfSpy).close();
276     }
277
278     /**
279      * Tests storeRemainingRecords() when the entity manager throws an exception.
280      */
281     @Test
282     public void testStoreRemainingRecordsException() throws InterruptedException {
283         // arrange to throw an exception
284         when(emfSpy.createEntityManager()).thenThrow(EXPECTED_EXCEPTION);
285
286         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
287
288         runThread();
289     }
290
291     @Test
292     public void testStoreRecord() throws InterruptedException {
293         /*
294          * Note: we change sub-request ID each time to guarantee that the records are
295          * unique.
296          */
297
298         // no start time
299         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
300
301         // no end time
302         operation = new ControlLoopOperation(operation);
303         operation.setSubRequestId(UUID.randomUUID().toString());
304         operation.setStart(Instant.now());
305         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
306
307         // both start and end times
308         operation = new ControlLoopOperation(operation);
309         operation.setSubRequestId(UUID.randomUUID().toString());
310         operation.setEnd(Instant.now());
311         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
312
313         // only end time
314         operation = new ControlLoopOperation(operation);
315         operation.setSubRequestId(UUID.randomUUID().toString());
316         operation.setStart(null);
317         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
318
319         runThread();
320
321         // all of them should have been stored
322         assertEquals(4, mgr.getRecordsCommitted());
323
324         // each was unique
325         assertEquals(4, mgr.getRecordsInserted());
326         assertEquals(0, mgr.getRecordsUpdated());
327     }
328
329     /**
330      * Tests storeRecord() when records are updated.
331      */
332     @Test
333     public void testStoreRecordUpdate() throws InterruptedException {
334         /*
335          * Note: we do NOT change sub-request ID, so that records all refer to the same DB
336          * record.
337          */
338
339         // no start time
340         operation.setStart(null);
341         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
342
343         // no end time
344         operation = new ControlLoopOperation(operation);
345         operation.setStart(Instant.now());
346         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
347
348         // both start and end times
349         operation = new ControlLoopOperation(operation);
350         operation.setEnd(Instant.now());
351         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
352
353         // only end time
354         operation = new ControlLoopOperation(operation);
355         operation.setStart(null);
356         mgr.store(REQ_ID, event.getClosedLoopControlName(), event, MY_ENTITY, operation);
357
358         runThread();
359
360         // all of them should have been stored
361         assertEquals(4, mgr.getRecordsCommitted());
362
363         // only one new record
364         assertEquals(1, mgr.getRecordsInserted());
365
366         // remainder were updates
367         assertEquals(3, mgr.getRecordsUpdated());
368     }
369
370     private void runThread() throws InterruptedException {
371         if (threadFunction == null) {
372             return;
373         }
374
375         Thread thread2 = new Thread(() -> {
376             threadFunction.accept(emfSpy);
377             finished.countDown();
378         });
379
380         thread2.setDaemon(true);
381         thread2.start();
382
383         mgr.stop();
384
385         assertTrue(finished.await(5, TimeUnit.SECONDS));
386     }
387
388     private static OperationHistoryDataManagerParamsBuilder makeBuilder() {
389         // @formatter:off
390         return OperationHistoryDataManagerParams.builder()
391                         .url("jdbc:h2:mem:" + OperationHistoryDataManagerImplTest.class.getSimpleName())
392                         .dbType("H2")
393                         .driver("org.h2.Driver")
394                         .userName("sa")
395                         .password("")
396                         .batchSize(BATCH_SIZE)
397                         .maxQueueLength(MAX_QUEUE_LENGTH);
398         // @formatter:on
399     }
400
401     /**
402      * Manager that uses the shared DB.
403      */
404     private class SharedDb extends OperationHistoryDataManagerImpl {
405         public SharedDb() {
406             super(params);
407         }
408
409         @Override
410         protected EntityManagerFactory makeEntityManagerFactory(String opsHistPu, Properties props) {
411             // re-use the same factory to avoid re-creating the DB for each test
412             return emfSpy;
413         }
414     }
415
416     /**
417      * Manager that uses the shared DB and a pseudo thread.
418      */
419     private class PseudoThread extends SharedDb {
420
421         @Override
422         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
423             threadFunction = command;
424             return thread;
425         }
426     }
427
428     /**
429      * Manager that uses the shared DB and catches the thread.
430      */
431     private class RealThread extends SharedDb {
432
433         @Override
434         protected Thread makeThread(EntityManagerFactory emfactory, Consumer<EntityManagerFactory> command) {
435             thread = super.makeThread(emfactory, command);
436             return thread;
437         }
438     }
439 }